白皮书
车联网设计与实现:搭建可靠、高效、符合行业需求的车联网平台 →

eKuiper Newsletter 2022-06|离线缓存重发机制升级,优化弱网场景使用

eKuiper Team
2022-7-1
eKuiper Newsletter 2022-06|离线缓存重发机制升级,优化弱网场景使用

六月的盛夏时节正是 eKuiper 项目捐献给 LF Edge 基金会一周年之时。六月初,项目圆满完成了在基金会的第一次年度 review,并确立了下一年度升级到 Stage 2 的目标。在此我们衷心感谢各位社区贡献者、合作伙伴和用户,期待新的一年能有更多伙伴加入到社区的建设中。

我们的开发工作也取得了不错的进展。月初,小版本 1.5.1 发布,主要解决了一些用户问题。在 1.6.0 版本开发方面,我们完成了离线缓存和重发机制的升级,更适应边缘部署中常见的边云网络连接易丢失的弱网场景。与此同时,我们补齐了一些 SQL 语法支持,包括 IN/NOT IN 表达式的支持、ORDER BY 对表达式和别名的支持等,方便用户编写更复杂的过滤和排序逻辑。最后,可视化拖拽能力的开发目前已完成后台 API 的部分验证。

离线缓存和重发

大数据时代,云边协同是主流的计算模式。边缘计算的一部分结果需要发送到云端进行进一步的整合。然而边云之间的网络连接常常是不稳定的,网络连接故障时有发生。作为边缘流式计算引擎,eKuiper 经常有规则将计算结果汇入外部系统,尤其是远程的外部系统中。这种情况下,我们需要考虑弱网环境的处理:在网络断开等故障期间,必须对数据进行缓存,并在重新连接后重新发送。

此前,eKuiper 在一定程度上支持 sink 缓存。它提供了一个全局配置来切换缓存开启;系统/规则级配置用于内存缓存的序列化时间间隔。然而,缓存只是在内存中和复制到 DB(内存的镜像)中,并且没有定义明确的重发策略。六月,我们对缓存机制进行了优化,缓存将同时保存在内存和磁盘中,这样缓存的容量就变得更大了;它还将持续检测故障恢复状态,并在不重新启动规则的情况下实现自动重新发送。

流程

缓存只发生在 sink 中,因为那是 eKuiper 之外唯一可以发送数据的地方。每个 sink 都可以配置自己的缓存机制。每个 sink 的缓存流程是相同的。如果启用了缓存,所有 sink 的事件都会经过两个阶段:首先是将所有内容保存到缓存中;然后在收到 ack 后删除缓存。

  • 错误检测:发送失败后,sink 应该通过返回特定的错误类型来识别可恢复的失败(网络等),这将返回一个失败的 ack,这样缓存就可以被保留下来。对于成功的发送或不可恢复的错误,将发送一个成功的 ack 来删除缓存。
  • 缓存机制:缓存将首先被保存在内存中。如果超过了内存的阈值,后面的缓存将被保存到磁盘中。一旦磁盘缓存超过磁盘存储阈值,缓存将开始 rotate。内存中最早的缓存将被丢弃,并加载磁盘中最早的缓存来代替。
  • 重发策略:如果有一个 ack 正在发送中,则等待一个成功的 ack 以继续发送下个缓存数据。否则,当有新的数据到来时,发送缓存中的第一个数据以检测网络状况。如果 ack 成功,按顺序链式发送所有的缓存(mem + disk)。链式发送可定义一个发送间隔,防止形成消息风暴。

配置

sink 缓存的配置有两个层次。etc/kuiper.yaml 中的全局配置,定义所有规则的默认行为。还有一个规则 sink 层的定义,用来覆盖默认行为。

  • enableCache:是否启用 sink cache。缓存存储配置遵循 etc/kuiper.yaml 中定义的元数据存储的配置。
  • memoryCacheThreshold:要缓存在内存中的消息数量。出于性能方面的考虑,最早的缓存信息被存储在内存中,以便在故障恢复时立即重新发送。这里的数据会因为断电等故障而丢失。
  • maxDiskCache:缓存在磁盘中的信息的最大数量。磁盘缓存是 FIFO。如果磁盘缓存满了,最早的一页信息将被加载到内存缓存中,取代旧的内存缓存。
  • bufferPageSize:缓冲页是批量读/写到磁盘的单位,以防止频繁的IO。如果页面未满,eKuiper因硬件或软件错误而崩溃,最后未写入磁盘的页面将被丢失。
  • resendInterval:故障恢复后重新发送信息的时间间隔,防止信息风暴。
  • cleanCacheAtStop:是否在规则停止时清理所有缓存,以防止规则重新启动时对过期消息进行大量重发。如果不设置为 true,一旦规则停止,内存缓存将被存储到磁盘中。否则,内存和磁盘规则会被清理掉。

目前,该功能的代码已经合并到 1.6.0 版本的分支(https://github.com/lf-edge/ekuiper/tree/1.6.0)中。感兴趣的朋友可以自行编译使用。

列表过滤

在规则引擎中,我们经常需要判断某个值是否在一个列表中,从而触发相应的动作。在标准 SQL 语法中,通常使用 IN/NOT IN 表达式进行这样的过滤。本月,我们实现了 IN 运算符的支持。使用方法支持以下两种:

  1. 与标准 SQL 语法相同,支持同时设置多个表达式。

    expression [NOT] IN (expression2,...n)
    
  2. 在 eKuiper 的使用场景中,复杂类型和无模式使用较多,因此也支持直接使用表达式(需要确保为数组类型)作为右侧运算符。

    expression [NOT] IN arrayExpression
    

1.5.1 版本

月初发布的 1.5.1 版本主要解决问题和小功能更新。主要的功能更新包括:

  • 扩大数据模板的支持范围。新的版本中添加了 memory sink, edegX sink, tdengine sink 等对数据模板的支持。
  • 支持 window_start() 和 window_end() 作为其他函数的参数。

解决的 bug 包括:

  • 重启规则后,Neuron 连接失败问题
  • 插件更新导致规则语法错误时,已运行规则的状态异常问题
  • 使用共享源时,重启规则可能随机导致连接失败
  • REST API 使用鉴权后的跨域访问问题

推荐阅读