1、ListState简介

Flink提供了三种基于key/value的state的实现方式,分别是:

ValueState<T>
ListState<T>
ReducingState<T>

官方文档关于state的使用方法,参考这里:Working with State

ListState需要将某些值存到一个List中(Iterable),意味着缓存的数据不只是一个而是多个值。很多情况下都可以使用,例如计算的数值要包含全天的每一个记录,此时只有将每个记录的值存成一个列表才可以计算。

2、ListState案例

首先需要先定义一个ListState,然后再重写KeyedProcessFunction中的open方法:

	private var itemState : ListState[ItemViewCount] = _

    override def open(parameters: Configuration): Unit = {

      //命名状态变量的名字和类型
      val itemStateDescription: ListStateDescriptor[ItemViewCount] = new ListStateDescriptor[ItemViewCount]("itemState", classOf[ItemViewCount])
      itemState = getRuntimeContext.getListState(itemStateDescription)
    }

ListStateDescriptor提供了几种不同的定义方式:
在这里插入图片描述
两个参数分别是ListStateDescriptor的名字和typeClass

3、总结

Flink提供了三种基于key/value的state接口,ListState接口适用于缓存多个值的计算。具体实现之前,因为state必须是基于key,且必须获取getRuntimeContext,state必须同时满足两个条件:

  • 直接基于keyedStream或者由keyedStream转换来的windowedStream
  • 必须继承RichFunction

实际实现时候,因为windowedStream在scala中不能实现RichWindowFunction,因此在main中使用flatmap间接实现了windowFunction中的功能:

val fromTransactionDataStream = watermarkTransaction
      .keyBy(_.code)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      
val transaction = fromTransactionDataStream
      .apply(new StockTransactionApply)
      .keyBy(_._3)
      .flatMap(new TransactionStateFlatMapFunction)

版权声明:本文为weixin_44240370原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_44240370/article/details/103134584