在刚刚过去的2018年,“毒疫苗”事件再次触及了大众的敏感神经,由于十年前的“毒奶粉”事件还历历在目。咱们急需建立一个全国性的药品(食品)监控追踪体系。与此同时,近年来随着国家对医药行业的大力支持,中国的医疗事业也出现了跨越式的发展,大量的新型药品上市,极大的丰富了患者和消费者的选择范围。大量的药品在市面上流通,产生了大量的状态数据,且这类数据在爆发式的增加。如何高效的存储和溯源药品状态数据已经成为一个行业难题。传统方案经常采用好比MySQL数据库分库分表的方式,可是这个方案在开发、运维、可扩展性都有很多弊端。数据库
业界开始愈来愈多的使用分布式的NoSQL方案来解决大数据的问题。好比阿里健康基于表格存储(Tablestore)推出了“码上放心” 药品监管码查询功能,解决了大众的药品查询需求。这仅仅是第一步,创建一个完善全国性药品追踪体系是一个艰巨而漫长的任务。借用网上的一句话,最终咱们要实现药品的:“来源可查,去向可追,责任可究”。并发
图1 码上放心 溯源截图运维
在整个药品监管体系中,药品自己的管理和药品轨迹溯源是药品监管体系的两大核心功能,本篇文章主要是介绍使用表格存储的Timestream模型快速高效的实现这两类功能。async
药品的元数据是指药品在上市以前的在国家药品监督管理局(CFDA)备案信息,记录了药品名称、分类、成分、批次、临床一期、二期、N期测试数据、自研或进口等详细信息,多达几十个字段。分布式
图2 药品元数据ide
用户会经过页面或者APP的方式浏览和查询药品信息,这须要应用提供多种组合的查询方式,好比:性能
上面只是列举的一些典型查询场景,药品备案信息中拥有大量的字段,使用者会从多个查询维度查询数据。所以在保证性能的前提下,提供丰富的查询功能成为元数据管理的主要技术难点。测试
药品的状态数据是指药品在生产、流经过程中产生的状态数据,好比药品的原材料流通、药企生产药品过程当中的状态、运输过程的轨迹、医院药店存储和使用数据等。fetch
图3 常见状态数据大数据
药品流通会产生大量的状态数据,这些数据须要持续的记录下来,后续才能够作到真正的药品溯源。咱们先来罗列一下药品状态数据:
从上面的数据来源可知,一盒简单的药品在到送到患者手上以前,会有大量的流通环节,每一个环节都会产生大量的状态数据。同时,中国市场药品的规模在万亿人民币级别,而且伴随每一年有将近一成的增加,是全球第二大医药市场。要知足如此巨大的规模下的状态数据的存储,极高的写入吞吐、海量存储规模、可控的存储成本成为必需要解决的问题。
图4 MySQL分库分表 vs Tablestore
从对药品元数据管理和状态数据溯源的总结可知,要知足以上的功能和性能需求,单机已经没法知足要求,须要使用分布式的方案。通常传统的方案会采用MySQL分库分表的方案,可是这个方案在实际生产和运维中面临很多问题,好比:
总结来看,从理论上能知足以上的功能需求,可是要想真正在生产中使用和维护好这套存储系统,只能说“想爱你并不容易”。在这种大数据的OLTP的场景下,业界通常选用分布式的NoSQL方案。所以咱们推荐使用Tablestore一站式的解决以上问题。Tablestore是一款阿里自研的分布式NoSQL服务,提供多元索引支持丰富的查询需求,支撑超大规模的并发访问和低延迟的性能,能够很好的解决药品元数据管理和溯源的需求。
Timestream是表格存储推出的最新数据模型,这个模型针对时序数据、轨迹数据、溯源数据,定义了一套简单清晰易用的API,细节能够参考《Tablestore Timestream:为海量时序数据存储设计的全新数据模型》。
在咱们列举的药品监管场景中,药品的元数据能够很是简单的抽象为Timestream的元数据(Meta),状态数据抽象为Timestream的Data数据。本文做为一个实战文章,所以使用Timestream模型来快速高效的实现以上两个功能。
从上面的Timestream介绍文章可知,Timestream拥有几个核心概念,分别是:Name, Tag, Attribute, Timestamp, Point(Fields)。咱们罗列一个表格,展现怎么将药品的相关数据映射到Timestream的模型中,如图所示:
图5 模型转换图
接下来咱们经过一个能够运行的Demo,向你们展现怎么使用Timestream API实现元数据管理和溯源功能。
写入
查询
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>tablestore</artifactId> <version>4.11.2</version> </dependency>
对于一些固定且有特殊索引需求的字段,咱们在建立Meta表的时候须要单独指定,好比“生产日期”、地理信息、状态数据等。
考虑到后面的扩展需求,咱们增长一个扩展字段,“extension”,用于存储未定义的元数据。
如下示例只是给了部分元数据字段,用户能够根据本身的需求设置更多的索引字段。
public void createMetaTable() { List<AttributeIndexSchema> index = new ArrayList<AttributeIndexSchema>(); index.add(new AttributeIndexSchema("produced_date", AttributeIndexSchema.Type.LONG)); index.add(new AttributeIndexSchema("period_of_validity", AttributeIndexSchema.Type.LONG)); index.add(new AttributeIndexSchema("loc", AttributeIndexSchema.Type.GEO_POINT)); index.add(new AttributeIndexSchema("links", AttributeIndexSchema.Type.KEYWORD)); index.add(new AttributeIndexSchema("status", AttributeIndexSchema.Type.KEYWORD)); index.add(new AttributeIndexSchema("extension", AttributeIndexSchema.Type.KEYWORD).setIsArray(true)); db.createMetaTable(index); }
这个比较简单,只须要设定表名便可。由于咱们是Schema Free的体系,不须要预先指定列,在写入的时候指定便可。
public void createDataTable() { db.createDataTable(conf.getDataTableName()); }
元数据导入,咱们将一个本地的csv文件中的数据导入到数据库中
public void importMeta() throws IOException { TimestreamMetaTable metaTable = db.metaTable(); String [] fileHeader = {"分类", "名称", "监管号", "受理号", "生产日期", "有效日期", "注册分类", "申请类型", "企业名称", "任务类型"}; String csvFile = conf.getMetaFile(); CSVFormat format = CSVFormat.DEFAULT.withHeader(fileHeader).withIgnoreHeaderCase().withTrim(); Reader reader = Files.newBufferedReader(Paths.get(csvFile)); CSVParser csvParser = new CSVParser(reader, format); for (CSVRecord r : csvParser.getRecords()) { TimestreamIdentifier identifier = new TimestreamIdentifier.Builder(r.get("分类")) .addTag("名称", r.get("名称")) .addTag("监管号", r.get("监管号")) .build(); TimestreamMeta meta = new TimestreamMeta(identifier); meta.addAttribute("produced_date", r.get("生产日期")); meta.addAttribute("period_of_validity", r.get("有效日期")); List<String> extension = new ArrayList(); extension.add("受理号=" + r.get("受理号")); extension.add("注册分类=" + r.get("注册分类")); extension.add("申请类型=" + r.get("申请类型")); extension.add("企业名称=" + r.get("企业名称")); extension.add("任务类型=" + r.get("任务类型")); meta.addAttribute("extension", new Gson().toJson(extension)); metaTable.put(meta); System.out.println(meta.toString()); } }
状态数据导入,这里loc, links,status在Meta和Data都存储了一次,Meta表中存储主要是作后续的索引查询,Data表中存储主要是作
public void importData() throws Exception { TimestreamMetaTable metaTable = db.metaTable(); TimestreamDataTable dataTable = db.dataTable(conf.getDataTableName()); String [] fileHeader = {"分类", "名称", "监管号", "生产日期", "位置", "环节", "状态"}; String csvFile = conf.getDataFile(); CSVFormat format = CSVFormat.DEFAULT.withHeader(fileHeader).withIgnoreHeaderCase().withTrim(); Reader reader = Files.newBufferedReader(Paths.get(csvFile)); CSVParser csvParser = new CSVParser(reader, format); for (CSVRecord r : csvParser.getRecords()) { TimestreamIdentifier identifier = new TimestreamIdentifier.Builder(r.get("分类")) .addTag("名称", r.get("名称")) .addTag("监管号", r.get("监管号")) .build(); TimestreamMeta meta = new TimestreamMeta(identifier); String loc = toLocationString(r.get("位置")); String links = r.get("环节"); String status = r.get("状态"); meta.addAttribute("loc", loc); meta.addAttribute("links", links); meta.addAttribute("status", status); metaTable.update(meta); Point point = new Point.Builder(this.getTimestamp(r, "生产日期"), TimeUnit.MILLISECONDS) .addField("loc", loc) .addField("links", links) .addField("status", status) .build(); dataTable.asyncWrite(identifier, point); System.out.println(point.toString()); } dataTable.flush(); }
1. 基本的药品详细信息查询,主要是根据用户输入条件,显示药品的元数据。咱们这里根据药品分类、药品名称、生产企业来查询药品。
Filter filter = and( Name.equal("中药"), Tag.equal("名称", "复方阿胶"), Attribute.in("extension", new String[]{"企业名称=山东****也有限公司"}) ); Iterator<TimestreamMeta> iter = metaTable.filter(filter).fetchAll(); while (iter.hasNext()) { TimestreamMeta m = iter.next(); System.out.println(m); }
2. 药品的防伪鉴定,结合生产日期,运输轨迹、销售状态和查询用户等数据对药品实行防伪鉴定。咱们这里输入名称和药品监管码。
Filter filter = and( Name.equal("中药"), Tag.equal("名称", "复方阿胶"), Tag.equal("监管号", "8160000000000019") ); Iterator<TimestreamMeta> iter = metaTable.filter(filter).selectAttributes("status").fetchAll(); while (iter.hasNext()) { TimestreamMeta m = iter.next(); System.out.println(m.getAttributeAsString("status")); } // 从查询的结果来看,药品处于召回中,有使用风险
3. 查询指定地点范围内的特定药品。好比查询使用者5KM范围的“阿莫西林”。
Filter filter = and( Name.equal("化药"), Tag.prefix("名称", "阿莫西林"), Attribute.inGeoDistance("loc", "31.6533906593,103.8427768645", 5 * 1000) ); Iterator<TimestreamMeta> iter = metaTable.filter(filter).fetchAll(); while (iter.hasNext()) { TimestreamMeta m = iter.next(); System.out.println(m); }
4. 药品轨迹重放,遍历指定药品的一个轨迹溯源信息。
TimestreamIdentifier identifier = new TimestreamIdentifier.Builder("化药") .addTag("名称", "阿莫西林") .addTag("监管号", "8150000000000000") .build(); Iterator<Point> iter = dataTable.get(identifier).select("loc").fetchAll(); while (iter.hasNext()) { Point p = iter.next(); System.out.println(p); }
原文连接 本文为云栖社区原创内容,未经容许不得转载。