近期的flink做业中,须要对上传的日志数据进行大量的校验。
校验规则大多比较简单,仅为字符串长度,数组长度,数据的最大值和最小值,非空判断等。然而不想写诸多校验代码,容易致使代码又丑又繁琐。联想SpringBoot项目中的参数校验,因而想着在纯maven的项目中引入校验。java
SpringBoot的基本参数校验是基于Hibernate Validator实现的,所以在pom中引入如下依赖:数组
<dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator</artifactId> <version>6.1.0.Final</version> </dependency> <dependency> <groupId>org.glassfish</groupId> <artifactId>javax.el</artifactId> <version>3.0.1-b11</version> </dependency> <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator-cdi</artifactId> <version>6.1.0.Final</version> </dependency>
在须要验证的实体类中引入校验注解(不得不说,注解真香).多线程
public class LogEvent { @NotNull private Instant timestamp; @NotNull private String filepath; @NotNull @Length(min = 1, max = 64) private String region; @NotNull private Boolean status; @NotNull @Min(-1) @Max(60 * 1000) private Integer timeDelay; @NotNull @Length(min = 1, max = 64) private String target; @Length(max = 1024) private String message; @Size(max = 5) private List<String> tags; }
由于Validator是thread safe
实现,所以多线程中能够放心的使用。maven
@Slf4j public class LogEventUtil { // thread safe private static final Validator VALIDATOR = Validation.buildDefaultValidatorFactory().getValidator(); public static boolean validate(LogEvent event) { Set<ConstraintViolation<LogEvent>> constraintViolations = VALIDATOR.validate(event); if (!constraintViolations.isEmpty()) { return false; } // 此处省略若干复杂的校验规则(脏活不可能一点都不接触的) } }
经过VALIDATOR.validate便可实现对LogEvent的基本校验。ui
寥寥几笔,即完成数据读取以及校验。hibernate
private static DataStream<LogEvent> configureKafkaConsumer(final StreamExecutionEnvironment env, ParameterTool parameterTool) { String topic = parameterTool.get("kafka.topic", ""); Properties kafkaProperties = filterPrefix(parameterTool.getProperties(), "kafka"); return env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), kafkaProperties)) .map((MapFunction<String, LogEvent>) LogEventUtil::parseLogEvent) .filter((FilterFunction<LogEvent>) LogEventUtil::validate) .name("LogEventSourceStream") .returns(LogEvent.class); }