本文的基础环境可以参考flink 1.10.1 java版本wordcount演示 (nc + socket)
这里采用静态数据进行模拟商户订单数据。当接收到订单数据时,会根据商户id进行分组,然后再按天进行统计。
1. 主程序代码
package com.demo.statis;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import javax.annotation.Nullable;
public class ShopDayGMVStatis {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
// 处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//从文件中读取数据
String inputPath = "data/shoporder.txt";
DataStream<String> inputDataSet = env.readTextFile(inputPath);
//对数据集进行处理
DataStream<ShopOrder> dataStream = inputDataSet.map(new MapFunction<String, ShopOrder>() {
@Override
public ShopOrder map(String s) throws Exception {
String splits[] = s.split(",");
Long time = new Long(splits[1]);
return new ShopOrder(splits[0], time * 1000,
new Long(splits[2]) );
}
})
.assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarks<ShopOrder>()
{
@Override
public long extractTimestamp(ShopOrder shopOrder, long l) {
return shopOrder.getPayTime();
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return null;
}
}
);
DataStream<ShopDayGmv> result = dataStream.keyBy(ShopOrder::getShopId)
.process(new ShopDayKeyProcess());
result.print("shop day statistics");
env.execute();
}
}
这里根据订单支付时间的对应日期进行统计。
2. 统计代码
package com.demo.statis;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.Date;
public class ShopDayKeyProcess extends KeyedProcessFunction<String, ShopOrder, ShopDayGmv> {
// MapState key:date+type, value:amount。例如:key="18581Sales" 标示2020-11-15 的销售额
private MapState<String, Long> dayAmount;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// ttl策略,(此处需要注意,有些参数跟随flink设置而略有不同)
StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(3))
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupInRocksdbCompactFilter(3000)
.build();
MapStateDescriptor<String, Long> amountStateDesc = new MapStateDescriptor<>("amountState", TypeInformation.of(String.class), TypeInformation.of(Long.class));
// 设置ttl
amountStateDesc.enableTimeToLive(stateTtlConfig);
// 获取MapState
dayAmount = getRuntimeContext().getMapState(amountStateDesc);
}
@Override
public void processElement(ShopOrder value, Context context, Collector<ShopDayGmv> collector) throws Exception {
final ShopDayGmv shopDayGmv = new ShopDayGmv();
shopDayGmv.setShopId(value.getShopId());
shopDayGmv.setTime(value.getPayTime());
long payTime = value.getPayTime();
// paytime,下面公式计算得到日期标示,此值不会重复
// 60 * 60 * 24 = 86400(秒) = 1天
// 60 * 60 * 8 = 28800(秒) = 8小时,对应到GMT+8小时
long dateNum = (payTime + 28800) / 86400;
shopDayGmv.setTimeKey(timeFormat(payTime));
// 从状态中获取每日销售额
Long dateSalesState = dayAmount.get(dateNum + "Sales");
Long dateCountState = dayAmount.get(dateNum + "Count");
// 销售额
if (dateSalesState == null) {
// 没有数据,标示该时期下第一笔订单,订单量=1,销售额=price
dayAmount.put(dateNum + "Sales", value.getOrderPrice());
} else {
dayAmount.put(dateNum + "Sales", value.getOrderPrice() + dateSalesState);
}
// 订单量
if (dateCountState == null) {
// 没有数据,标示该时期下第一笔订单,订单量=1,销售额=price
dayAmount.put(dateNum + "Count", 1L);
} else {
dayAmount.put(dateNum + "Count", 1L + dateCountState);
}
// 获取每日订单量和销售额
shopDayGmv.setTodayCount(dayAmount.get(dateNum + "Count"));
shopDayGmv.setTodaySales(dayAmount.get(dateNum + "Sales"));
// 数据收集
collector.collect(shopDayGmv);
}
public static String timeFormat(long timeStamp) {
return new SimpleDateFormat("yyyy-MM-dd").format(new Date(timeStamp));
}
}
通过设置状态缓存,每个商户的统计状态数据只会保存3天,这样可以避免数据的不断累积导致内存被消耗完的问题。
3. 辅助代码ShopOrder
package com.demo.statis;
public class ShopOrder {
private String shopId;
private Long payTime;
private Long orderPrice;
public ShopOrder() {
}
public ShopOrder(String shopId, Long payTime, Long orderPrice) {
this.shopId = shopId;
this.payTime = payTime;
this.orderPrice = orderPrice;
}
public String getShopId() {
return shopId;
}
public void setShopId(String shopId) {
this.shopId = shopId;
}
public Long getPayTime() {
return payTime;
}
public void setPayTime(Long payTime) {
this.payTime = payTime;
}
public Long getOrderPrice() {
return orderPrice;
}
public void setOrderPrice(Long orderPrice) {
this.orderPrice = orderPrice;
}
@Override
public String toString() {
return "ShopOrder{" +
"shopId='" + shopId + '\'' +
", payTime=" + payTime +
", orderPrice=" + orderPrice +
'}';
}
}
4. 辅助代码ShopDayGmv
package com.demo.statis;
public class ShopDayGmv {
private String shopId;
private Long time;
private Long todayCount;
private Long todaySales;
private String timeKey;
public ShopDayGmv() {
}
public ShopDayGmv(String shopId, Long time, Long todayCount, Long todaySales, String timeKey) {
this.shopId = shopId;
this.time = time;
this.todayCount = todayCount;
this.todaySales = todaySales;
this.timeKey = timeKey;
}
public String getShopId() {
return shopId;
}
public void setShopId(String shopId) {
this.shopId = shopId;
}
public Long getTime() {
return time;
}
public void setTime(Long time) {
this.time = time;
}
public Long getTodayCount() {
return todayCount;
}
public void setTodayCount(Long todayCount) {
this.todayCount = todayCount;
}
public Long getTodaySales() {
return todaySales;
}
public void setTodaySales(Long todaySales) {
this.todaySales = todaySales;
}
public String getTimeKey() {
return timeKey;
}
public void setTimeKey(String timeKey) {
this.timeKey = timeKey;
}
@Override
public String toString() {
return "ShopDayGmv{" +
"shopId='" + shopId + '\'' +
", time=" + time +
", todayCount=" + todayCount +
", todaySales=" + todaySales +
", timeKey='" + timeKey + '\'' +
'}';
}
}
5. 测试数据
001,1644479447,10
001,1644479447,20
001,1644479445,10
001,1644479446,30
001,1644479447,20
001,1644479444,80
001,1644479445,10
002,1641801046,10
003,1641801056,30
004,1641801066,10
6. 执行程序
可以看到如下的输出结果:
shop day statistics> ShopDayGmv{shopId='001', time=1644479447000, todayCount=1, todaySales=10, timeKey='2022-02-10'}
shop day statistics> ShopDayGmv{shopId='001', time=1644479447000, todayCount=2, todaySales=30, timeKey='2022-02-10'}
shop day statistics> ShopDayGmv{shopId='001', time=1644479445000, todayCount=3, todaySales=40, timeKey='2022-02-10'}
shop day statistics> ShopDayGmv{shopId='001', time=1644479446000, todayCount=4, todaySales=70, timeKey='2022-02-10'}
shop day statistics> ShopDayGmv{shopId='001', time=1644479447000, todayCount=5, todaySales=90, timeKey='2022-02-10'}
shop day statistics> ShopDayGmv{shopId='001', time=1644479444000, todayCount=6, todaySales=170, timeKey='2022-02-10'}
shop day statistics> ShopDayGmv{shopId='001', time=1644479445000, todayCount=7, todaySales=180, timeKey='2022-02-10'}
shop day statistics> ShopDayGmv{shopId='002', time=1641801046000, todayCount=1, todaySales=10, timeKey='2022-01-10'}
shop day statistics> ShopDayGmv{shopId='003', time=1641801056000, todayCount=1, todaySales=30, timeKey='2022-01-10'}
shop day statistics> ShopDayGmv{shopId='004', time=1641801066000, todayCount=1, todaySales=10, timeKey='2022-01-10'}
可以看出,已经安装商户编号和日志进行了统计汇总输出。这里测试只是输出到控制台,实际项目中可以根据项目情况将结果输出到kafka、redis或者数据库等。
可以看到,针对每个数据,都会出现一个对应的输出结果进行对应。如果输入数据并发比较大的话,输出的数据并发也会相应增大,此时需要选择合适的输出方式。
版权声明:本文为liaomingwu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。