使用 JSONata 进行 JSON 事件转换¶
JSONata 简介¶
JSONata 是一种用于 JSON 数据的轻量级查询和转换语言。在 Knative EventTransform 中,JSONata 表达式允许您:
- 从事件数据中提取值
- 将数据字段提升为 CloudEvent 属性
- 重构事件负载
- 添加计算值
- 应用条件逻辑
基本用法¶
要在 EventTransform 资源中使用 JSONata,请在 spec.jsonata.expression 字段中指定表达式
apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
name: simple-transform
spec:
jsonata:
expression: |
{
"specversion": "1.0",
"id": id,
"time": time,
"type": "transformed.type",
"source": "transform.simple",
"data": data
}
CloudEvent 结构¶
JSONata 表达式的输入是整个 CloudEvent,包括其所有属性和数据。您的表达式必须生成一个有效的 CloudEvent,至少包含以下必填字段:
specversion: 应设置为 "1.0"id: 事件的唯一标识符type: 事件类型source: 事件源data: 事件负载
常见转换模式¶
保留原始事件结构¶
在添加或修改属性时保留原始事件结构
{
"specversion": "1.0",
"id": id,
"type": type,
"source": source,
"time": time,
"data": data,
"newattribute": "static value"
}
将字段提取为属性¶
从数据中提取字段并将其提升为 CloudEvent 属性
{
"specversion": "1.0",
"id": id,
"type": "user.event",
"source": source,
"time": time,
"userid": data.user.id,
"region": data.region,
"data": $
}
JSONata 中的 $ 符号表示整个输入对象,因此 data: $ 保留了完整的原始事件数据。
重构事件数据¶
完全重塑事件数据
{
"specversion": "1.0",
"id": order.id,
"type": "order.transformed",
"source": "transform.order-processor",
"time": order.time,
"orderid": order.id,
"data": {
"customer": {
"id": order.user.id,
"name": order.user.name
},
"items": order.items.{ "sku": sku, "quantity": qty, "price": price },
"total": $sum(order.items.(price * qty))
}
}
给定上述转换和以下 JSON 对象作为输入
{
"order": {
"time" : "2024-04-05T17:31:05Z",
"id": "8a76992e-cbe2-4dbe-96c0-7a951077089d",
"user": {
"id": "bd9779ef-cba5-4ad0-b89b-e23913f0a7a7",
"name": "John Doe"
},
"items": [
{"sku": "KNATIVE-1", "price": 99.99, "qty": 1},
{"sku": "KNATIVE-2", "price": 129.99, "qty": 2}
]
}
}
它将产生
{
"specversion": "1.0",
"id": "8a76992e-cbe2-4dbe-96c0-7a951077089d",
"type": "order.transformed",
"source": "transform.order-processor",
"time": "2024-04-05T17:31:05Z",
"orderid": "8a76992e-cbe2-4dbe-96c0-7a951077089d",
"data": {
"customer": {
"id": "bd9779ef-cba5-4ad0-b89b-e23913f0a7a7",
"name": "John Doe"
},
"items": [
{
"sku": "KNATIVE-1",
"quantity": 1,
"price": 99.99
},
{
"sku": "KNATIVE-2",
"quantity": 2,
"price": 129.99
}
],
"total": 359.97
}
}
条件转换¶
根据条件应用不同的转换
{
"specversion": "1.0",
"id": id,
"type": type = "order.created" ? "new.order" : "updated.order",
"source": source,
"time": time,
"priority": data.total > 1000 ? "high" : "normal",
"data": $
}
JSONata 高级特性¶
数组处理¶
JSONata 让处理事件数据中的数组变得容易
{
"specversion": "1.0",
"id": id,
"type": "order.processed",
"source": source,
"time": $now(),
"itemcount": $count(order.items),
"multiorder": $count(order.items) > 1,
"data": {
"order": order.id,
"items": order.items[quantity > 1].{
"product": name,
"quantity": quantity,
"lineTotal": price * quantity
},
"totalvalue": $sum(order.items.(price * quantity))
}
}
给定上述转换和以下 JSON 对象作为输入
{
"id": "12345",
"source": "https://example.com/orders",
"order": {
"id": "order-67890",
"items": [
{
"name": "Laptop",
"price": 1000,
"quantity": 1
},
{
"name": "Mouse",
"price": 50,
"quantity": 2
},
{
"name": "Keyboard",
"price": 80,
"quantity": 3
}
]
}
}
它将产生
{
"specversion": "1.0",
"id": "12345",
"type": "order.processed",
"source": "https://example.com/orders",
"time": "2025-03-03T09:13:23.753Z",
"itemcount": 3,
"multiorder": true,
"data": {
"order": "order-67890",
"items": [
{
"product": "Mouse",
"quantity": 2,
"lineTotal": 100
},
{
"product": "Keyboard",
"quantity": 3,
"lineTotal": 240
}
],
"totalvalue": 1340
}
}
使用内置函数¶
JSONata 提供了许多有用的函数
{
"specversion": "1.0",
"id": id,
"type": "user.event",
"source": source,
"time": time,
"timestamp": $now(),
"username": $lowercase(data.user.name),
"initials": $join($map($split(data.user.name, " "), function($v) { $substring($v, 0, 1) }), ""),
"data": $
}
转换回复¶
将 EventTransform 与 Sink 结合使用时,您还可以转换响应
apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
name: request-reply-transform
spec:
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: processor-service
jsonata:
expression: |
# Request transformation
{
"specversion": "1.0",
"id": id,
"type": "request.transformed",
"source": source,
"time": time,
"data": data
}
reply:
jsonata:
expression: |
# Reply transformation
{
"specversion": "1.0",
"id": id,
"type": "reply.transformed",
"source": "transform.reply-processor",
"time": time,
"data": data
}
最佳实践¶
-
始终生成有效的 CloudEvent:确保您的表达式包含所有必需的 CloudEvent 字段。
-
彻底测试表达式:使用 JSONata Exerciser 来验证复杂的表达式。
-
保持表达式可读性:在 YAML 中使用换行符和缩进,使表达式更易于阅读和维护。
-
处理缺失数据:使用
?运算符为可能缺失的字段提供默认值。 -
避免无限循环:当在 Broker 中使用回复功能时,请务必更改事件类型或添加过滤器以防止无限循环。
示例¶
用户注册事件转换器¶
apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
name: user-registration-transformer
spec:
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: default
jsonata:
expression: |
{
"specversion": "1.0",
"id": id,
"type": "user.registered.processed",
"source": "transform.user-processor",
"time": time,
"userid": data.user.id,
"region": data.region ? data.region : "unknown",
"tier": data.subscription.tier ? data.subscription.tier : "free",
"data": {
"userId": data.user.id,
"email": $lowercase(data.user.email),
"displayName": data.user.name ? data.user.name : $substring(data.user.email, 0, $indexOf(data.user.email, "@")),
"registrationDate": $now(),
"subscription": data.subscription ? data.subscription : { "tier": "free" }
}
}
订单处理事件转换器¶
apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
name: order-processor
spec:
jsonata:
expression: |
{
"specversion": "1.0",
"id": id,
"type": "order.processed",
"source": "transform.order-processor",
"time": time,
"orderid": data.id,
"customerid": data.customer.id,
"region": data.region,
"priority": $sum(data.items.(price * quantity)) > 1000 ? "high" : "standard",
"data": {
"orderId": data.id,
"customer": data.customer,
"items": data.items.{
"productId": productId,
"name": name,
"quantity": quantity,
"unitPrice": price,
"totalPrice": price * quantity
},
"total": $sum(data.items.(price * quantity)),
"tax": $sum(data.items.(price * quantity)) * 0.1,
"grandTotal": $sum(data.items.(price * quantity)) * 1.1,
"created": data.created,
"processed": $now()
}
}