最近项目组准备开发一个IoT平台项目,须要使用到StreamSets DataCollector组件进行数据处理。
其中的一个Stage,产品经理设计了一个以下的配置界面:
前端
预期的展现效果是经过下拉“物实例”列表框的时候,根据所选择物实例的属性个数联动刷新“属性匹配”,并且物实例下拉框的数据是经过API获取的。java
这带来2个问题:git
实际上,单纯的下拉列表和联动刷新SDC是原生支持的,可是下拉列表的数据是静态配置的,并且联动刷新的界面也是预先配置的。而咱们的项目需求是须要根据下拉列表中选择的物实例属性个数进行联动刷新,而不一样的物实例的属性个数并不相同,所以没法作到预先配置。后端
因此,咱们的原型设计SDC原生并不能支持。
可是产品设计并不但愿修改,所以只能寻找对应的解决办法。api
对于下拉列表的数据从外部获取这个实现相对容易,在Stage中对于下拉列表的配置一般使用以下方式:浏览器
// 物实例下拉列表 @ConfigDef( required = true, type = ConfigDef.Type.MODEL, label = "Instance", defaultValue = "", displayPosition = 30, group = "DIGITALTWIN", description = "Instance List" ) @ValueChooserModel(DigitalTwinInstanceChooser.class) public String instance = null;
其中,DigitalTwinInstanceChooser类是数据源,它必须实现接口com.streamsets.pipeline.api.ChooserValues
,即必须实现以下接口方法:架构
public interface ChooserValues { String getResourceBundle(); List<String> getValues(); List<String> getLabels(); }
其中,方法getResourceBundle()是资源国际化配置参数,getValues()为下拉列表选项中各项对应的value,getLabels()为下拉列表选项中各项在界面上显示的key。所以,为了实现下拉列表数据从外部获取,只须要在实现了接口ChooserValues
的类构造方法中初始化对应数据便可,以下示例:框架
public class DigitalTwinInstanceChooser implements ChooserValues { private static List<String> values = null; private static List<String> labels = null; public DigitalTwinInstanceChooser() { // 只须要刷新一次 if(values != null) { return; } List<DigitalTwinInstance> list = DigitalTwinStreamClient.getDigitalTwinInstanceList(); setList(list); } // 数据初始化 public static void setList(List<DigitalTwinInstance> list) { if(list == null) { return; } values = new ArrayList<String>(list.size()); labels = new ArrayList<String>(list.size()); for(DigitalTwinInstance dtb : list) { if(dtb == null) { continue; } values.add(dtb.getId()+ ""); labels.add(dtb.getName()); } } @Override public String getResourceBundle() { return null; } @Override public List<String> getValues() { return values; } @Override public List<String> getLabels() { return labels; } }
在咱们的这个项目需求中是须要根据下拉选中的物实例属性个数动态刷新界面的,这个在SDC中原生并不支持。一开始我没有任何思路,组里熟悉这个框架的人问了一圈,没一我的解决过相似问题。其中一位同事告诉我以前一位已经离职的同事遇到过相似的需求,可是具体怎么实现的并不清楚,只是说好像经过修改前端来完成的。虽然这个信息没有直接解决个人问题,可是却给我打开了一点思路。咱们知道,在SDC的Stage配置中是实时保存的。SDC的前端使用AugularJS框架,只要用户配置参数发生了变化,就会实时经过API保存到后端,这样Stage在运行时就能获取到用户配置的对应参数。顺着这个思路,我对Stage保存参数的请求进行了抓包,通过对每一次保存请求参数和API接口的返回结果进行对比发现:前端每一次将保存参数经过API发送到后台进行保存以后会将该参数再返回给前端。因而我就脑洞大开:之因此须要将用户设置的参数再返回给前端,应该是前端须要这些参数进行界面渲染。那么,对于我这个需求,当用户选择了某个具体的物实例以后,是否能够在后端根据传递的物实例参数动态将对应的属性参数返回给前端,这样前端就能够动态渲染出相应的“属性匹配”界面了呢?可是这样的话就须要修改SDC保存Stage配置参数的源码了,报着试一试的心态因而开始了以下Hack实践。
第一步,找到保存Stage参数的API接口。在浏览器中能够看到,保存Stage配置参数的地址为:/rest/v1/pipeline/{pipelineid}
,因而凭直接找到了对应API接口类:datacollector\container\src\main\java\com\streamsets\datacollector\restapi\PipelineStoreResource.java,在该接口中有一个更新Pipeline的方法:ide
@Path("/pipeline/{pipelineId}") @POST @ApiOperation(value = "Update an existing Pipeline Configuration by name", response = PipelineConfigurationJson.class, authorizations = @Authorization(value = "basic")) @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) @RolesAllowed({ AuthzRole.CREATOR, AuthzRole.ADMIN, AuthzRole.CREATOR_REMOTE, AuthzRole.ADMIN_REMOTE }) public Response savePipeline( @PathParam("pipelineId") String name, @QueryParam("rev") @DefaultValue("0") String rev, @QueryParam("description") String description, @ApiParam(name="pipeline", required = true) PipelineConfigurationJson pipeline) throws URISyntaxException, PipelineException { if (store.isRemotePipeline(name, rev)) { throw new PipelineException(ContainerError.CONTAINER_01101, "SAVE_PIPELINE", name); } PipelineInfo pipelineInfo = store.getInfo(name); RestAPIUtils.injectPipelineInMDC(pipelineInfo.getTitle(), pipelineInfo.getPipelineId()); PipelineConfiguration pipelineConfig = BeanHelper.unwrapPipelineConfiguration(pipeline); PipelineConfigurationValidator validator = new PipelineConfigurationValidator(stageLibrary, name, pipelineConfig); pipelineConfig = validator.validate(); // 在这里判断并处理用户当前返回的配置 // 1.检查下拉菜单的值是否发生变化 if(checkDtInstanceChanged(pipelineInfo, rev, pipelineConfig)) { LOG.info("Need update DT Instance attribute!"); // 2.若是下拉菜单的值发生了变化才动态返回值 pipelineConfig = updateDigitalTwinConfig(pipelineConfig); } pipelineConfig = store.save(user, name, rev, description, pipelineConfig); return Response.ok().entity(BeanHelper.wrapPipelineConfiguration(pipelineConfig)).build(); }
第二步,在接口方法中根据需求实现对应的逻辑,动态返回下拉列表中选择物实例信息。ui
private boolean checkDtInstanceChanged(PipelineInfo pipelineInfo, String rev, PipelineConfiguration pipelineConfig) { try { // 加载存储的数据 PipelineConfiguration storePipeLine = store.load(pipelineInfo.getPipelineId(), rev); // 读取已经存储的配置参数 String storeDtInstance = getDtInstanceId(storePipeLine); // 读取前端发送过来的配置参数 String paramDtInstance = getDtInstanceId(pipelineConfig); LOG.info("Check DT Instance Changed, stored: {}, param: {}", storeDtInstance, paramDtInstance); return !storeDtInstance.equals(paramDtInstance); } catch (PipelineException e) { e.printStackTrace(); } return false; } // 从配置中读取参数 private String getDtInstanceId(PipelineConfiguration pipeline) { if(pipeline == null) { return ""; } List<StageConfiguration> stageList = pipeline.getStages(); for(StageConfiguration stage : stageList) { if(!"digitaltwin-stage".equals(stage.getLibrary())) { continue; } List<Config> configList = stage.getConfiguration(); for(Config config : configList) { if(!"config.instance".equals(config.getName())) { continue; } Object value = config.getValue(); if(value == null) { return ""; } return value.toString(); } } return ""; } // 实现动态更新前端界面 private PipelineConfiguration updateDigitalTwinConfig(PipelineConfiguration pipelineConfig) { List<StageConfiguration> stages = pipelineConfig.getStages(); if(stages == null || stages.isEmpty()) { return pipelineConfig; } for(StageConfiguration stage : stages) { if(stage == null) { continue; } if(!"digitaltwin-stage".equals(stage.getLibrary())) { continue; } String resourceURL = null; String instance = null; List<Config> configList = stage.getConfiguration(); int size = configList.size(); List<Config> newConfigList = new ArrayList<Config>(size); int index = -1; String key = "config.attrs"; for(int i = 0; i < size; i++) { Config config = configList.get(i); if(config == null) { continue; } if(key.equals(config.getName())) { index = i; newConfigList.add(null); continue; } if("config.resourceURL".equals(config.getName())) { resourceURL = config.getValue().toString(); } if("config.instance".equals(config.getName())) { instance = config.getValue().toString(); } newConfigList.add(config); } if((resourceURL == null || "".equals(resourceURL.trim()) || (instance == null || "".equals(instance.trim())))) { return pipelineConfig; } // 动态读取属性列表 List<DtConfig> list = DtStreamClient.getDtInstanceAttrList(resourceURL, instance); //newConfigList.add(new Config("config.attrs", list)); newConfigList.set(index, new Config(key, list)); stage.setConfig(newConfigList); // 强制前端刷新界面 /*Map<String, Object> uiInfo = stage.getUiInfo(); try { String key = "xPos"; Object xPos = uiInfo.get(key); if(xPos != null) { int x = Integer.valueOf(xPos.toString()); x += 1; uiInfo.put(key, x); } } catch (NumberFormatException e) { e.printStackTrace(); }*/ } return pipelineConfig; }
最后进行验证,居然成功了!!!
type = ConfigDef.Type.MAP
,以下:// 动态切换属性 @ConfigDef( required = false, type = ConfigDef.Type.MAP, label = "DigitalTwin Attribute Map", displayPosition = 40, group = "DIGITALTWIN" ) public Map<String, String> attrs = new HashMap<String, String>(); public Map<String, String> getAttrs() { return this.attrs; }