经过“nutch与起点R3集成之笔记(1、2、三)”中的步骤,咱们能够创建起一个行业内部网的搜索引擎,但搜索引擎一个最重要的功能,就是必须能搜索到网络中最新的内容。这就要求nutch能及时采集到网络中的最新网页信息,同时将这些新采集到的信息更新到solr索引库中。故本篇介绍如何实现索引的更新和去重问题。java
咱们随时可使用nutch的crawl命令来爬行网站,例如,今天上午8:00采集一次新浪网站http://www.sina.com.cn,并经过nutch 的solrindex索引到solr索引库中,而后上午10:00再采集一次新浪网站,再经过solrindex索引到solr库中。这样作,能够保证用户能搜索到网络中较新的信息。在solr中实现没有什么问题,由于solr使用的是update方式更新索引库,由于索引field名为id的是主键,只要id是惟一的,update到索引库的操做就是成功的。 c++
咱们回过头来看看 《nutch与起点R3集成之笔记(二)》,在该篇中,咱们是定义了用digest做为id,而没有采用在nutch的conf 的solrconfig.xml中定义用url做为id。其缘由在于,若是用url做为id,在起点R3中会出现看不到索引数据等一些问题,我分析了好久,估计是因为url中会出现: / ? & 字符,这些字符不经过escape转换,在solr的q串提交会出现问题。web
咱们来看一个solr的查询日志:算法
[search] webapp=null path=/select params={hl.snippets=3&q=id%3A20679dc38f64730579a1b2538727f76f&hl.simple.pre=%3Cfont+color%3D%27red%27%3E&hl.simple.post=%3C%2Ffont%3E&hl.fl=title&hl.fl=text&hl.usePhraseHighlighter=false&hl=true} hits=1 status=0 QTime=2
其中:q=id%3A20679dc38f64730579a1b2538727f76f 是表示指定ID的查询串为 20679dc38f64730579a1b2538727f76f 的查询,若是用url做为id,而url也存在&字符,好比这样会致使将url中&后接内容做为查询参数,会致使不能正确出现查询结果。因此,在nutch向solr进行索引时,用url定义为ID是不合适的。shell
nutch中digest是对采集的每个网页内容的32位哈希值,若是两个网页内容彻底同样,它们的digest值确定会同样,但哪怕其中之一多或少一个空格,它们的digest值就会不同。因此,我认为,用digest作id是一个很是不错的选择。apache
若是nutch在两次不一样的时间抓某个网页,例如还有新浪首页http://www.sina.com.cn,若是在两次抓取这段时间,首页没有什么变化,nutch计算出的两次抓取的首页的digest确定是同样的,这样,nutch在第二次向solr进行索引时,发现digest在solr的索引库中已存在(id是惟一的),天然就写不进去。这样,避免了索引库的重复记录。网络
同时,采用digest做为Id,也避免了一个行业内部网有镜像网站在solr索引库的重复记录。app
总之,采用digest做为Id,防止了nutch在向solr索引库写入时写入重复记录的问题,有自然去重功能。eclipse
可是,还必须解决一个问题:若是nutch在两次不一样的时间抓某个网页,若是这个网页有变化,两次的digest值确定不一样,这样,nutch在向solr索引库update时,会将同一个url(如http://www.sina.com.cn)写入两条记录。那么,在搜索界面键入:新浪,确定会出现两条新浪网站数据。如此类推,nutch采集了同一url网页 n 次,就有可能在solr索引库中有n条类似的记录。这样的状况出现让我很抓狂,陷于困局之中。webapp
其实,这就是对同一个url进行去重的问题。nutch有一个去重的模块org.apache.nutch.indexer.solr.SolrDeleteDuplicates.java,命令为: bin/nutch solrdedup,分析发现,它是对digest相同的记录进行去重。估计是nutch把url做为solr的主键id,因此没有考虑到对url相同的记录进行去重。代码以下:
package org.apache.nutch.indexer.solr; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.MalformedURLException; import java.text.SimpleDateFormat; import java.util.Iterator; import java.util.Date; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.nutch.util.NutchConfiguration; import org.apache.nutch.util.NutchJob; import org.apache.nutch.util.TimingUtil; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; /** * Utility class for deleting duplicate documents from a solr index. * * The algorithm goes like follows: * * Preparation: * <ol> * <li>Query the solr server for the number of documents (say, N)</li> * <li>Partition N among M map tasks. For example, if we have two map tasks * the first map task will deal with solr documents from 0 - (N / 2 - 1) and * the second will deal with documents from (N / 2) to (N - 1).</li> * </ol> * * MapReduce: * <ul> * <li>Map: Identity map where keys are digests and values are {@link SolrRecord} * instances(which contain id, boost and timestamp)</li> * <li>Reduce: After map, {@link SolrRecord}s with the same digest will be * grouped together. Now, of these documents with the same digests, delete * all of them except the one with the highest score (boost field). If two * (or more) documents have the same score, then the document with the latest * timestamp is kept. Again, every other is deleted from solr index. * </li> * </ul> * * Note that unlike {@link DeleteDuplicates} we assume that two documents in * a solr index will never have the same URL. So this class only deals with * documents with <b>different</b> URLs but the same digest. */ public class SolrDeleteDuplicates implements Reducer<Text, SolrDeleteDuplicates.SolrRecord, Text, SolrDeleteDuplicates.SolrRecord>, Tool { public static final Log LOG = LogFactory.getLog(SolrDeleteDuplicates.class); private static final String SOLR_GET_ALL_QUERY = SolrConstants.ID_FIELD + ":[* TO *]"; private static final int NUM_MAX_DELETE_REQUEST = 1000; public static class SolrRecord implements Writable { private float boost; private long tstamp; private String id; public SolrRecord() { } public SolrRecord(SolrRecord old) { this.id = old.id; this.boost = old.boost; this.tstamp = old.tstamp; } public SolrRecord(String id, float boost, long tstamp) { this.id = id; this.boost = boost; this.tstamp = tstamp; } public String getId() { return id; } public float getBoost() { return boost; } public long getTstamp() { return tstamp; } public void readSolrDocument(SolrDocument doc) { id = (String)doc.getFieldValue(SolrConstants.ID_FIELD); boost = (Float)doc.getFieldValue(SolrConstants.BOOST_FIELD); Date buffer = (Date)doc.getFieldValue(SolrConstants.TIMESTAMP_FIELD); tstamp = buffer.getTime(); } public void readFields(DataInput in) throws IOException { id = Text.readString(in); boost = in.readFloat(); tstamp = in.readLong(); } public void write(DataOutput out) throws IOException { Text.writeString(out, id); out.writeFloat(boost); out.writeLong(tstamp); } } public static class SolrInputSplit implements InputSplit { private int docBegin; private int numDocs; public SolrInputSplit() { } public SolrInputSplit(int docBegin, int numDocs) { this.docBegin = docBegin; this.numDocs = numDocs; } public int getDocBegin() { return docBegin; } public int getNumDocs() { return numDocs; } public long getLength() throws IOException { return numDocs; } public String[] getLocations() throws IOException { return new String[] {} ; } public void readFields(DataInput in) throws IOException { docBegin = in.readInt(); numDocs = in.readInt(); } public void write(DataOutput out) throws IOException { out.writeInt(docBegin); out.writeInt(numDocs); } } public static class SolrInputFormat implements InputFormat<Text, SolrRecord> { /** Return each index as a split. */ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { SolrServer solr = new CommonsHttpSolrServer(job.get(SolrConstants.SERVER_URL)); final SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY); solrQuery.setFields(SolrConstants.ID_FIELD); solrQuery.setRows(1); QueryResponse response; try { response = solr.query(solrQuery); } catch (final SolrServerException e) { throw new IOException(e); } int numResults = (int)response.getResults().getNumFound(); int numDocsPerSplit = (numResults / numSplits); int currentDoc = 0; SolrInputSplit[] splits = new SolrInputSplit[numSplits]; for (int i = 0; i < numSplits - 1; i++) { splits[i] = new SolrInputSplit(currentDoc, numDocsPerSplit); currentDoc += numDocsPerSplit; } splits[splits.length - 1] = new SolrInputSplit(currentDoc, numResults - currentDoc); return splits; } public RecordReader<Text, SolrRecord> getRecordReader(final InputSplit split, final JobConf job, Reporter reporter) throws IOException { SolrServer solr = new CommonsHttpSolrServer(job.get(SolrConstants.SERVER_URL)); SolrInputSplit solrSplit = (SolrInputSplit) split; final int numDocs = solrSplit.getNumDocs(); SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY); solrQuery.setFields(SolrConstants.ID_FIELD, SolrConstants.BOOST_FIELD, SolrConstants.TIMESTAMP_FIELD, SolrConstants.DIGEST_FIELD); solrQuery.setStart(solrSplit.getDocBegin()); solrQuery.setRows(numDocs); QueryResponse response; try { response = solr.query(solrQuery); } catch (final SolrServerException e) { throw new IOException(e); } final SolrDocumentList solrDocs = response.getResults(); return new RecordReader<Text, SolrRecord>() { private int currentDoc = 0; public void close() throws IOException { } public Text createKey() { return new Text(); } public SolrRecord createValue() { return new SolrRecord(); } public long getPos() throws IOException { return currentDoc; } public float getProgress() throws IOException { return currentDoc / (float) numDocs; } public boolean next(Text key, SolrRecord value) throws IOException { if (currentDoc >= numDocs) { return false; } SolrDocument doc = solrDocs.get(currentDoc); String digest = (String) doc.getFieldValue(SolrConstants.DIGEST_FIELD); key.set(digest); value.readSolrDocument(doc); currentDoc++; return true; } }; } } private Configuration conf; private SolrServer solr; private int numDeletes = 0; private UpdateRequest updateRequest = new UpdateRequest(); public Configuration getConf() { return conf; } public void setConf(Configuration conf) { this.conf = conf; } public void configure(JobConf job) { try { solr = new CommonsHttpSolrServer(job.get(SolrConstants.SERVER_URL)); } catch (MalformedURLException e) { throw new RuntimeException(e); } } public void close() throws IOException { try { if (numDeletes > 0) { LOG.info("SolrDeleteDuplicates: deleting " + numDeletes + " duplicates"); updateRequest.process(solr); solr.commit(); } } catch (SolrServerException e) { throw new IOException(e); } } public void reduce(Text key, Iterator<SolrRecord> values, OutputCollector<Text, SolrRecord> output, Reporter reporter) throws IOException { SolrRecord recordToKeep = new SolrRecord(values.next()); while (values.hasNext()) { SolrRecord solrRecord = values.next(); if (solrRecord.getBoost() > recordToKeep.getBoost() || (solrRecord.getBoost() == recordToKeep.getBoost() && solrRecord.getTstamp() > recordToKeep.getTstamp())) { updateRequest.deleteById(recordToKeep.id); recordToKeep = new SolrRecord(solrRecord); } else { updateRequest.deleteById(solrRecord.id); } numDeletes++; if (numDeletes >= NUM_MAX_DELETE_REQUEST) { try { LOG.info("SolrDeleteDuplicates: deleting " + numDeletes + " duplicates"); updateRequest.process(solr); } catch (SolrServerException e) { throw new IOException(e); } updateRequest = new UpdateRequest(); numDeletes = 0; } } } public void dedup(String solrUrl) throws IOException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); LOG.info("SolrDeleteDuplicates: starting at " + sdf.format(start)); LOG.info("SolrDeleteDuplicates: Solr url: " + solrUrl); JobConf job = new NutchJob(getConf()); job.set(SolrConstants.SERVER_URL, solrUrl); job.setInputFormat(SolrInputFormat.class); job.setOutputFormat(NullOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(SolrRecord.class); job.setMapperClass(IdentityMapper.class); job.setReducerClass(SolrDeleteDuplicates.class); JobClient.runJob(job); long end = System.currentTimeMillis(); LOG.info("SolrDeleteDuplicates: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); } public int run(String[] args) throws IOException { if (args.length != 1) { System.err.println("Usage: SolrDeleteDuplicates <solr url>"); return 1; } dedup(args[0]); return 0; } public static void main(String[] args) throws Exception { int result = ToolRunner.run(NutchConfiguration.create(), new SolrDeleteDuplicates(), args); System.exit(result); } }
在上面MAPReduce注释说明中,谈到了如何在分布式服务Hadoop的MAPReduce中 实现删除相同的digest算法以下:
1.将相同的digest文档放在一组,而后只留下最高分(boost值)文档,将其它相同的digest文档删除掉(去重);
2.若是最高分(boost值)有两个以上的文档,则将最新的时戳(timestamp)的文档留下,将其他的删除掉。
分析代码发现, 只须要将代码中的有两个地方的DIGEST_FIELD替换为URL_FIELD ,就能够实现对相同的url去重。修改后,在eclipse中编译,找到主函数org.apache.nutch.indexer.solr.SolrDeleteDuplicates,创建起SolrDeleteDuplicates运行程序:
其对应的自变量设置为:
点击“运行”,在eclipse控制台上出现运行结果:
表示url去重成功。
总结以下:本文提出,在nutch造成solr索引时,采用digest做为ID,避免了nutch采用url做为ID出现的查询不到结果的问题。经过修改nutch去重模块org.apache.nutch.indexer.solr.SolrDeleteDuplicates.java,实现了对url的去重和更新。