简介
在数据统中常常须要统计一些时长数据,例如在线时长,这些数据有些比较好统计,有些稍微麻烦一点,例如,根据登陆和退出日志统计用户在线时长。java
咱们能够利用窗口函数lead与lag来完成,很是方便,lead的函数是把某一列数据的后面第n行数据拼接到当前行,lag是把指定列的前面第n行数据拼接到当前行。sql
lag(column,n,default) lead(column,n,default)
参数column是选择要拼接的列,参数n表示要移动几行,通常就移动1行,default是默认值,若是lag前面没有行,lead后面没有行就使用默认值。apache
使用这2个函数的关键点是:分区和排序api
select gid, lag(time,1,'0') over (partition by gid order by time) as lag_time, lead(time,1,'0') over (partition by gid order by time) as lead_time from table_name;
分区就是分组,使用partition by分组多个列之间用逗号分割app
排序使用order by指定,多个排序列之间使用逗号分割ide
lead和lag组合,可以发挥超出咱们想像的能力。函数
例如,经过登陆退出日志进行在线时长统计,若是要求不高直接:用户id分组,时间升序,而后使用lead让后一个退出时间拼接到当前登陆时间行就轻易能计算了。ui
可是考虑到有跨天的问题、日志丢失,并不肯定第一个就是登陆日志,后面的就是退出日志。this
经过lead和lag组合起来,咱们就能轻易的过滤丢非法的数据。spa
具体代码
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.api.java.UDF6; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; import org.junit.Before; import org.junit.Test; import java.io.Serializable; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.LinkedList; import java.util.List; public class SparkLoginTimeTest implements Serializable { private SparkSession sparkSession; @Before public void setUp() { sparkSession = SparkSession .builder() .appName("test") .master("local") .getOrCreate(); } private static List<Info> getInfos() { String[] gids = {"10001","10001","10002","10002","10003","10003","10004","10004","10005","10005"}; LocalDateTime base = LocalDateTime.of(2020, 1, 1,0,0,0); LinkedList<Info> infos = new LinkedList<>(); for(int i=0;i<50;i++){ Info info = new Info(); info.setGid(gids[i%10]); info.setResult(i % 2); info.setDate(base.plus(i * 5, ChronoUnit.MINUTES).toInstant(ZoneOffset.UTC).toEpochMilli()); infos.add(info); } return infos; } @Test public void lag(){ List<Info> infos = getInfos(); sparkSession.udf().register("accTimes",accTimes(), DataTypes.LongType); Dataset<Info> dataset = sparkSession.createDataset(infos, Encoders.bean(Info.class)); dataset.show(100); dataset.createOrReplaceTempView("temp"); String sql = "select gid,result,date," + "lead(date,1,-1) over(partition by gid order by date) lead_date," + "lead(result,1,-1) over(partition by gid order by date) lead_result," + "lag(result,1,-1) over(partition by gid order by date) lag_result," + "lag(date,1,-1) over(partition by gid order by date) lag_date" + " from temp"; Dataset<Row> baseDs = sparkSession.sql(sql); Dataset<Row> rs = baseDs.withColumn("acc_times", functions.callUDF("accTimes", baseDs.col("result"), baseDs.col("date"), baseDs.col("lead_result"), baseDs.col("lead_date"), baseDs.col("lag_result"), baseDs.col("lag_date") )).groupBy("gid") .agg(functions.sum("acc_times").alias("accTimes")).na().fill(0) .select("gid", "accTimes"); rs.show(100); } private static UDF6<Integer,Long,Integer,Long,Integer,Long,Long> accTimes(){ return new UDF6<Integer, Long, Integer, Long, Integer, Long, Long>() { long dayMill = 86400000; @Override public Long call(Integer result, Long time, Integer headResult, Long headTime, Integer lagResult, Long lagTime) { if(lagResult == -1){//第一行 if(result == 1){//退出,计算退出到这一天的开始时间 return time - (time / dayMill) * dayMill ; } } if(headResult == -1){//最后一行 if(result == 0){//进入,计算到这一天结束 return (time / dayMill + 1) * dayMill - time; } } if(result == 0 && headResult == 1){//当前行是进入,而且下移行是退出 long rs; rs = headTime - time; if(rs > 0) { return rs; } } return 0L; } }; } public static class Info implements Serializable { /** * 用户惟一标识 */ private String gid; /** * 登陆、退出时间 */ private Long date; /** * 0-登陆、1-退出 */ private Integer result; public Integer getResult() { return result; } public void setResult(Integer result) { this.result = result; } public String getGid() { return gid; } public void setGid(String gid) { this.gid = gid; } public Long getDate() { return date; } public void setDate(Long date) { this.date = date; } } }