让决策更智能
新一代智能数据分析平台

如何计算连续发生天数

观小远发表于:2022年03月10日 17:50:39更新于:2022年03月10日 18:16:37

场景介绍

    某一事件可能会存在发生或者不发生的情况,系统中会按日记录这一事件的发生状态,如事件发生的情况下,会记录正常,不发生则记录异常。

    而我们希望知道事件的连续发生的情况,当事件为正常状态,则记录正常状态的持续天数,如某天发生异常,则终止计数,从下次正常状态重新计数。

数据结构示例:

需要根据事件A对应日期的状态,来计算连续发生的天数。(红色列为希望通过ETL处理后获得的数据)

事件名称日期状态连续发生天数
任务A2022/1/1正常1
任务A2022/1/2正常2
任务A2022/1/3正常3
任务A2022/1/4正常4
任务A2022/1/5正常5
任务A2022/1/6异常0
任务A2022/1/7正常1
任务A2022/1/8正常2
任务A2022/1/9异常0
任务A2022/1/10正常1
任务A2022/1/11正常2
任务A2022/1/12正常3
任务A2022/1/13正常4
任务A2022/1/14异常0
任务A2022/1/15正常1
任务A2022/1/16异常0
任务A2022/1/17正常1
任务A2022/1/18异常0


实现方法

STEP1:

    数据处理,将事件状态转换成0/1,方便后续计算时间发生的连续天数。

    可以使用值替换功能,处理状态。

    正常——>1,异常——>0

image.png


STEP2:

    在ETL中,使用struct函数将日期和事件状态数据构建成struct数据结构,使用collect_list函数将struct转变成array,最后使用array_sort函数对数组按日期进行排序;

    使用sql输入节点处理。

    

SELECT input1.`任务名称`,

array_sort(collect_list(struct(input1.`日期` as `日期`, input1.`状态` as `状态`))) as `发生记录`

from input1 

group by `任务名称`



STEP3:

添加计算字段,使用transform函数获得日期是否发生数据的数组;

日期列表    transform([发生记录],x-> x.`日期`)

是否发生列表    transform([发生记录],x-> x.`状态`)

image.png


STEP4:

    添加计算字段,使用aggregate函数计算事件发生持续天数。根据「是否发生列表」数组中的元素来判断,如果不发生(x=0),则重新开始累积;如果发生,则将aggregate函数返回的前一个值(acc[size(acc)-1])和当前元素(x)相加,从而得到累计值。最终返回一个初始值为0的发生持续时间数组。

发生持续时间    aggregate([是否发生列表], array(cast(0 as double)), (acc,x)-> concat(acc, array(If(x=0,0, acc[size(acc)-1] + x))))

    但由于此时的数组是包含初始值的,而实际我们希望查询持续天数是,是不包含初始值的。所以还需要添加计算字段,使用slice函数从第2位截取“缺货持续时间”数组,不包含初始值0;(此处新字段记为「发生持续时间2」

发生持续时间    slice([发生持续时间],2,size([发生持续时间])-1)

image.png


STEP5:

  • 添加计算字段,使用时序UDF构建持续发生天数查询表

持续发生天数查询表    Date_range_build([日期列表],[发生持续时间2])


STEP6:

  • 添加计算字段,使用explode函数将日期列表展开;

日期    explode([日期列表])


STEP7:

    添加计算字段,使用时序UDF查询发生持续天数;

发生持续天数    date_range_get([持续发生天数查询表],[日期])


STEP8:

    选择列,仅保留需要的列信息。

image.png

注:文中标黄字段名称可按照实际需要替换成您实际数据的对应字段名。


扩展阅读:

更多时序UDF函数功能可参考文档:时序UDF的介绍和使用方法

    您需要登录后才可以回复