向 Broker 发送评论¶

我们书店的主仪表板有一个评论区,读者可以在其中查看他人的评论并通过文本输入区域提交自己的评论。虽然这个过程看起来很简单——点击一个按钮,评论就发布了——但其底层机制由强大的事件驱动架构提供支持。
我们将学习哪些 Knative 功能?¶
- Knative 事件驱动代理 (Broker)
- Knative 事件接收器 (Sink)
- Knative 接收器绑定 (SinkBinding)
交付成果看起来是怎样的?¶

简而言之:用户在前端点击提交按钮后,评论将显示在事件显示服务 (event-display service) 中。
实施¶
步骤 0:了解基础知识¶

在微服务和 REST API 的世界中,我们通常将请求视为服务之间通信的主要方式。然而,在事件驱动架构中,最小的单位是事件。Knative 事件驱动遵循 CloudEvents 规范,因此在继续之前了解这个概念至关重要。在继续之前,请在此处了解有关 CloudEvents 的更多信息!
步骤 1:理解图书评论服务¶

图书评论服务是我们的 Node.js API 服务器,在我们事件驱动架构中扮演着至关重要的角色。了解其运行方式非常重要,因为它接收事件并对其进行适当处理。

关键概念:Broker 和 SinkBinding
在我们深入研究代码之前,让我们澄清两个重要的概念
- Broker (代理/中介):在事件驱动架构中充当中心点,将事件路由到正确的目的地。
- SinkBinding (接收器绑定):此 Knative 事件驱动组件自动将 Broker 的地址注入到环境变量
K_SINK中,确保 Node.js 服务器始终拥有正确的地址,无需手动更新。
您将在后续过程中获得更深入的理解。

让我们检查 node-server/index.js 文件,从 /add 函数开始。当用户通过前端提交评论时,它首先被此端点接收。
app.post('/add', async (req, res) => {
try {
const receivedEvent = HTTP.toEvent({headers: req.headers, body: req.body});
const brokerURI = process.env.K_SINK;
if (receivedEvent.type === 'new-review-comment') {
const response = await fetch(brokerURI, {
method: 'POST',
headers: {
'Content-Type': 'application/cloudevents+json',
'ce-specversion': receivedEvent.specversion,
'ce-type': receivedEvent.type,
'ce-source': receivedEvent.source,
'ce-id': receivedEvent.id,
},
body: JSON.stringify(receivedEvent.data),
});
}
} catch (err) {
console.error(err);
}
});
接收事件:/add 端点从前端接收事件。它使用 CloudEvents 的 SDK 将传入的请求转换为 CloudEvent 对象
const receivedEvent = HTTP.toEvent({headers: req.headers, body: req.body});
确定 Broker 地址:Broker 的地址在集群中动态分配。Node.js 服务器从环境变量 K_SINK 中检索此地址
const brokerURI = process.env.K_SINK;
您可能会想,是谁告诉环境变量地址的?那就是 Knative SinkBinding。
事件过滤:服务检查事件类型。如果是 new-review-comment,它会将事件转发到 Broker
if (receivedEvent.type === 'new-review-comment') {
// logic that forwards the event, see below
}
转发事件逻辑:带有适当的 CloudEvent 标头的事件将被转发到 Broker
const response = await fetch(brokerURI, {
method: 'POST',
headers: {
'Content-Type': 'application/cloudevents+json',
'ce-specversion': receivedEvent.specversion,
'ce-type': receivedEvent.type,
'ce-source': receivedEvent.source,
'ce-id': receivedEvent.id,
},
body: JSON.stringify(receivedEvent.data),
});

探索其他函数
Node.js 服务器包含其他遵循类似模式的函数,其中详细的注释解释了它们的功能
/insert:接收 CloudEvents 并将有效载荷插入 PostgreSQL 数据库。/comment:与前端建立 WebSocket 连接,将评论从数据库传输到前端。
步骤 2:创建 Broker¶

Broker 在您的事件驱动应用程序中充当路由器,接收事件并将其路由到正确的目的地。
- 1:创建一个名为
node-server/config/200-broker.yaml的新 YAML 文件,并添加以下内容
node-server/config/200-broker.yaml
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: bookstore-broker
- 2:应用 YAML 文件
kubectl apply -f node-server/config/200-broker.yaml
您将看到以下输出
broker.eventing.knative.dev/bookstore-broker created
或者,使用 Knative CLI kn 来创建 Broker
kn broker create bookstore-broker
您将看到以下输出
Broker 'bookstore-broker' successfully created in namespace 'default'.
验证
运行以下命令列出 Broker
kubectl get brokers
您应该看到 Broker bookstore-broker 的 READY 状态为 True。
NAME URL AGE READY REASON
bookstore-broker http://broker-ingress.knative-eventing.svc.cluster.local/default/bookstore-broker 7m30s True
故障排除
如果出现问题,请使用以下命令进行诊断
kubectl describe broker bookstore-broker
步骤 3:在 Node.js 服务器和 Broker 之间创建 SinkBinding¶

在应用程序中对 Kubernetes 服务进行硬编码的 URL 连接是有限制且不灵活的。SinkBinding 会动态地将 Kubernetes 服务的 URL 注入到您的应用程序中。
在此处了解有关 SinkBinding 的更多信息以及 规范架构!
创建 SinkBinding
- 1:创建一个名为
node-server/config/300-sinkbinding.yaml的新 YAML 文件,并添加以下内容
node-server/config/300-sinkbinding.yaml
apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
name: node-sinkbinding
spec:
subject:
apiVersion: apps/v1
kind: Deployment
selector:
matchLabels:
app: node-server
sink: # In this case, the sink is our Broker, which is the eventing service that will receive the events
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: bookstore-broker
- 2:应用 YAML 文件
kubectl apply -f node-server/config/300-sinkbinding.yaml
您将看到以下输出
sinkbinding.sources.knative.dev/node-sinkbinding created
验证
运行以下命令列出 sinkbindings
kubectl get sinkbindings
node-sinkbinding 的 READY 状态为 True。
NAME SINK AGE READY REASON
node-sinkbinding http://broker-ingress.knative-eventing.svc.cluster.local/default/bookstore-broker 2m43s True
步骤 4:创建事件显示服务 (event-display service)¶

事件显示是 Knative 事件驱动中的一个调试工具,它允许您将其用作事件的临时目标(也称为 sink)。
创建事件显示服务
- 1:创建一个名为
node-server/config/100-event-display.yaml的新 YAML 文件,并添加以下内容
node-server/config/100-event-display.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: event-display
spec:
replicas: 1
selector:
matchLabels:
app: event-display
template:
metadata:
labels:
app: event-display
spec:
containers:
- name: event-display
image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
ports:
- containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
name: event-display
spec:
selector:
app: event-display
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: ClusterIP
- 2:应用 YAML 文件
kubectl apply -f node-server/config/100-event-display.yaml
您将看到以下输出
deployment.apps/event-display created
service/event-display created
验证
运行以下命令列出 pod
kubectl get pods
您应该看到 pod event-display-XXXXXXX-XXXXX 处于“运行中”状态。
NAME READY STATUS RESTARTS AGE
bookstore-frontend-7b879ffb78-9bln6 1/1 Running 0 91m
event-display-55967c745d-bxrgh 1/1 Running 0 4m44s
node-server-644795d698-r9zlr 1/1 Running 0 4m43s
步骤 5:创建一个将 Broker 和事件显示连接起来的 Trigger¶

Trigger 能够根据 CloudEvent 的属性 将事件转发到正确的目的地。它是 Broker 和事件目标之间的连接器。
Trigger 中的 Filter (过滤器) 将根据过滤条件过滤事件。您将在 Trigger 的 YAML 文件中指定过滤条件。如果未指定过滤器,Trigger 将转发 Broker 接收到的所有事件。

Knative 中还有一个称为 Channel (通道) 的概念,通常来说,您可以将没有过滤器的 Broker & Trigger 视为等同于 Channel & Subscription。
在此处了解有关 Broker & Trigger 的更多信息!
创建 Trigger

在这里,我们创建一个 Trigger,它将把所有事件发送到事件显示服务。

- 1:创建一个名为
node-server/config/200-log-trigger.yaml的新 YAML 文件,并添加以下内容
node-server/config/200-log-trigger.yaml
# This Trigger subscribes to the Broker and will forward all the events that it received to event-display.
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: log-trigger
spec:
broker: bookstore-broker
subscriber:
ref:
apiVersion: v1
kind: Service
name: event-display
- 2:应用 YAML 文件
kubectl apply -f node-server/config/200-log-trigger.yaml
您将看到以下输出
trigger.eventing.knative.dev/log-trigger created
验证
运行以下命令列出 Trigger
kubectl get triggers
log-trigger 的 READY 状态应为 True。
NAME BROKER SUBSCRIBER_URI AGE READY REASON
log-trigger bookstore-broker http://event-display.default.svc.cluster.local 6m2s True
验证¶

使用以下命令打开事件显示服务的日志
kubectl logs -l=app=event-display -f
验证
在 UI 的评论框中输入一些内容并点击提交按钮。评论应出现在事件显示服务中,并显示以下输出
☁️ cloudevents.Event
Validation: valid
Context Attributes,
specversion: 1.0
type: new-review-comment
source: bookstore-eda
id: unique-comment-id
datacontenttype: application/json
Extensions,
knativearrivaltime: 2024-05-19T05:27:36.232562628Z
Data,
{
"reviewText": "test"
}
下一步¶

请确保您通过了验证测试,然后再继续。