- added concurrency to postprocessing of webgraph document

- bundeled separate webgraph postprocesing steps into one
pull/1/head
Michael Peter Christen 11 years ago
parent 5f4a6892c1
commit 51800007c4

@ -272,7 +272,7 @@ public class HostBrowser {
q.append(" AND ").append(CollectionSchema.url_paths_sxt.getSolrFieldName()).append(AbstractSolrConnector.CATCHALL_DTERM); q.append(" AND ").append(CollectionSchema.url_paths_sxt.getSolrFieldName()).append(AbstractSolrConnector.CATCHALL_DTERM);
} }
} }
BlockingQueue<SolrDocument> docs = fulltext.getDefaultConnector().concurrentDocumentsByQuery(q.toString(), 0, 100000, TIMEOUT, 100, BlockingQueue<SolrDocument> docs = fulltext.getDefaultConnector().concurrentDocumentsByQuery(q.toString(), 0, 100000, TIMEOUT, 100, 1,
CollectionSchema.id.getSolrFieldName(), CollectionSchema.id.getSolrFieldName(),
CollectionSchema.sku.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(),
CollectionSchema.failreason_s.getSolrFieldName(), CollectionSchema.failreason_s.getSolrFieldName(),

@ -130,7 +130,7 @@ public class IndexDeletion_p {
} }
try { try {
DigestURL u = new DigestURL(urlStub); DigestURL u = new DigestURL(urlStub);
BlockingQueue<SolrDocument> dq = defaultConnector.concurrentDocumentsByQuery(CollectionSchema.host_s.getSolrFieldName() + ":\"" + u.getHost() + "\"", 0, 100000000, Long.MAX_VALUE, 100, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); BlockingQueue<SolrDocument> dq = defaultConnector.concurrentDocumentsByQuery(CollectionSchema.host_s.getSolrFieldName() + ":\"" + u.getHost() + "\"", 0, 100000000, Long.MAX_VALUE, 100, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName());
SolrDocument doc; SolrDocument doc;
try { try {
while ((doc = dq.take()) != AbstractSolrConnector.POISON_DOCUMENT) { while ((doc = dq.take()) != AbstractSolrConnector.POISON_DOCUMENT) {

@ -178,7 +178,7 @@ public class SchemaConfiguration extends Configuration implements Serializable {
return changed; return changed;
} }
public boolean postprocessing_clickdepth(ClickdepthCache clickdepthCache, SolrInputDocument sid, DigestURL url, SchemaDeclaration clickdepthfield, int maxtime) { public boolean postprocessing_clickdepth(final ClickdepthCache clickdepthCache, final SolrInputDocument sid, final DigestURL url, final SchemaDeclaration clickdepthfield, final int maxtime) {
if (!this.contains(clickdepthfield)) return false; if (!this.contains(clickdepthfield)) return false;
// get new click depth and compare with old // get new click depth and compare with old
Integer oldclickdepth = (Integer) sid.getFieldValue(clickdepthfield.getSolrFieldName()); Integer oldclickdepth = (Integer) sid.getFieldValue(clickdepthfield.getSolrFieldName());
@ -194,7 +194,7 @@ public class SchemaConfiguration extends Configuration implements Serializable {
return false; return false;
} }
public boolean postprocessing_references(ReferenceReportCache rrCache, SolrInputDocument sid, DigestURL url, Map<String, Long> hostExtentCount) { public boolean postprocessing_references(final ReferenceReportCache rrCache, final SolrInputDocument sid, final DigestURL url, final Map<String, Long> hostExtentCount) {
if (!(this.contains(CollectionSchema.references_i) || if (!(this.contains(CollectionSchema.references_i) ||
this.contains(CollectionSchema.references_internal_i) || this.contains(CollectionSchema.references_internal_i) ||
this.contains(CollectionSchema.references_external_i) || this.contains(CollectionSchema.references_exthosts_i))) return false; this.contains(CollectionSchema.references_external_i) || this.contains(CollectionSchema.references_exthosts_i))) return false;

@ -138,10 +138,18 @@ public abstract class AbstractSolrConnector implements SolrConnector {
* @param maxcount the maximum number of results * @param maxcount the maximum number of results
* @param maxtime the maximum time in milliseconds * @param maxtime the maximum time in milliseconds
* @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used * @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used
* @param concurrency is the number of AbstractSolrConnector.POISON_DOCUMENT entries to add at the end of the feed
* @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element * @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element
*/ */
@Override @Override
public BlockingQueue<SolrDocument> concurrentDocumentsByQuery(final String querystring, final int offset, final int maxcount, final long maxtime, final int buffersize, final String ... fields) { public BlockingQueue<SolrDocument> concurrentDocumentsByQuery(
final String querystring,
final int offset,
final int maxcount,
final long maxtime,
final int buffersize,
final int concurrency,
final String ... fields) {
final BlockingQueue<SolrDocument> queue = buffersize <= 0 ? new LinkedBlockingQueue<SolrDocument>() : new ArrayBlockingQueue<SolrDocument>(buffersize); final BlockingQueue<SolrDocument> queue = buffersize <= 0 ? new LinkedBlockingQueue<SolrDocument>() : new ArrayBlockingQueue<SolrDocument>(buffersize);
final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity!
final Thread t = new Thread() { final Thread t = new Thread() {
@ -157,7 +165,7 @@ public abstract class AbstractSolrConnector implements SolrConnector {
try {queue.put(d);} catch (final InterruptedException e) {break;} try {queue.put(d);} catch (final InterruptedException e) {break;}
count++; count++;
} }
if (sdl.size() <= 0) break; if (sdl.size() < pagesize) break;
o += sdl.size(); o += sdl.size();
} catch (final SolrException e) { } catch (final SolrException e) {
break; break;
@ -165,7 +173,9 @@ public abstract class AbstractSolrConnector implements SolrConnector {
break; break;
} }
} }
try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {} for (int i = 0; i < concurrency; i++) {
try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {}
}
} }
}; };
t.start(); t.start();
@ -173,8 +183,14 @@ public abstract class AbstractSolrConnector implements SolrConnector {
} }
@Override @Override
public BlockingQueue<String> concurrentIDsByQuery(final String querystring, final int offset, final int maxcount, final long maxtime) { public BlockingQueue<String> concurrentIDsByQuery(
final BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); final String querystring,
final int offset,
final int maxcount,
final long maxtime,
final int buffersize,
final int concurrency) {
final BlockingQueue<String> queue = buffersize <= 0 ? new LinkedBlockingQueue<String>() : new ArrayBlockingQueue<String>(buffersize);
final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity!
final Thread t = new Thread() { final Thread t = new Thread() {
@Override @Override
@ -187,7 +203,7 @@ public abstract class AbstractSolrConnector implements SolrConnector {
for (SolrDocument d: sdl) { for (SolrDocument d: sdl) {
try {queue.put((String) d.getFieldValue(CollectionSchema.id.getSolrFieldName()));} catch (final InterruptedException e) {break;} try {queue.put((String) d.getFieldValue(CollectionSchema.id.getSolrFieldName()));} catch (final InterruptedException e) {break;}
} }
if (sdl.size() <= 0) break; if (sdl.size() < pagesize) break;
o += sdl.size(); o += sdl.size();
} catch (final SolrException e) { } catch (final SolrException e) {
break; break;
@ -195,7 +211,9 @@ public abstract class AbstractSolrConnector implements SolrConnector {
break; break;
} }
} }
try {queue.put(AbstractSolrConnector.POISON_ID);} catch (final InterruptedException e1) {} for (int i = 0; i < concurrency; i++) {
try {queue.put(AbstractSolrConnector.POISON_ID);} catch (final InterruptedException e1) {}
}
} }
}; };
t.start(); t.start();
@ -204,7 +222,7 @@ public abstract class AbstractSolrConnector implements SolrConnector {
@Override @Override
public Iterator<String> iterator() { public Iterator<String> iterator() {
final BlockingQueue<String> queue = concurrentIDsByQuery(CATCHALL_QUERY, 0, Integer.MAX_VALUE, 60000); final BlockingQueue<String> queue = concurrentIDsByQuery(CATCHALL_QUERY, 0, Integer.MAX_VALUE, 60000, 2 * pagesize, 1);
return new LookAheadIterator<String>() { return new LookAheadIterator<String>() {
@Override @Override
protected String next0() { protected String next0() {

@ -415,13 +415,13 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
} }
@Override @Override
public BlockingQueue<SolrDocument> concurrentDocumentsByQuery(String querystring, int offset, int maxcount, long maxtime, int buffersize, String... fields) { public BlockingQueue<SolrDocument> concurrentDocumentsByQuery(String querystring, int offset, int maxcount, long maxtime, int buffersize, final int concurrency, String... fields) {
return this.connector.concurrentDocumentsByQuery(querystring, offset, maxcount, maxtime, buffersize, fields); return this.connector.concurrentDocumentsByQuery(querystring, offset, maxcount, maxtime, buffersize, concurrency, fields);
} }
@Override @Override
public BlockingQueue<String> concurrentIDsByQuery(String querystring, int offset, int maxcount, long maxtime) { public BlockingQueue<String> concurrentIDsByQuery(String querystring, int offset, int maxcount, long maxtime, int buffersize, final int concurrency) {
return this.connector.concurrentIDsByQuery(querystring, offset, maxcount, maxtime); return this.connector.concurrentIDsByQuery(querystring, offset, maxcount, maxtime, buffersize, concurrency);
} }
} }

@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -427,8 +428,8 @@ public class EmbeddedSolrConnector extends SolrServerConnector implements SolrCo
} }
@Override @Override
public synchronized BlockingQueue<String> concurrentIDsByQuery(final String querystring, final int offset, final int maxcount, final long maxtime) { public synchronized BlockingQueue<String> concurrentIDsByQuery(final String querystring, final int offset, final int maxcount, final long maxtime, final int buffersize, final int concurrency) {
final BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); final BlockingQueue<String> queue = buffersize <= 0 ? new LinkedBlockingQueue<String>() : new ArrayBlockingQueue<String>(buffersize);
final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity!
final Thread t = new Thread() { final Thread t = new Thread() {
@Override @Override

@ -427,10 +427,10 @@ public class MirrorSolrConnector extends AbstractSolrConnector implements SolrCo
} }
@Override @Override
public BlockingQueue<String> concurrentIDsByQuery(String querystring, int offset, int maxcount, long maxtime) { public BlockingQueue<String> concurrentIDsByQuery(final String querystring, final int offset, final int maxcount, final long maxtime, final int buffersize, final int concurrency) {
if (this.solr0 != null && this.solr1 == null) return this.solr0.concurrentIDsByQuery(querystring, offset, maxcount, maxtime); if (this.solr0 != null && this.solr1 == null) return this.solr0.concurrentIDsByQuery(querystring, offset, maxcount, maxtime, buffersize, concurrency);
if (this.solr0 == null && this.solr1 != null) return this.solr1.concurrentIDsByQuery(querystring, offset, maxcount, maxtime); if (this.solr0 == null && this.solr1 != null) return this.solr1.concurrentIDsByQuery(querystring, offset, maxcount, maxtime, buffersize, concurrency);
return super.concurrentIDsByQuery(querystring, offset, maxcount, maxtime); return super.concurrentIDsByQuery(querystring, offset, maxcount, maxtime, buffersize, concurrency);
} }
} }

@ -214,10 +214,18 @@ public interface SolrConnector extends Iterable<String> /* Iterable of document
* @param maxcount the maximum number of results * @param maxcount the maximum number of results
* @param maxtime the maximum time in milliseconds * @param maxtime the maximum time in milliseconds
* @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used * @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used
* @param concurrency is the number of AbstractSolrConnector.POISON_DOCUMENT entries to add at the end of the feed
* @param fields list of fields * @param fields list of fields
* @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element * @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element
*/ */
public BlockingQueue<SolrDocument> concurrentDocumentsByQuery(final String querystring, final int offset, final int maxcount, final long maxtime, final int buffersize, final String ... fields); public BlockingQueue<SolrDocument> concurrentDocumentsByQuery(
final String querystring,
final int offset,
final int maxcount,
final long maxtime,
final int buffersize,
final int concurrency,
final String ... fields);
/** /**
* get a document id result stream from a solr query. * get a document id result stream from a solr query.
@ -226,8 +234,16 @@ public interface SolrConnector extends Iterable<String> /* Iterable of document
* @param querystring * @param querystring
* @param offset * @param offset
* @param maxcount * @param maxcount
* @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used
* @param concurrency is the number of AbstractSolrConnector.POISON_ID entries to add at the end of the feed
* @return * @return
*/ */
public BlockingQueue<String> concurrentIDsByQuery(final String querystring, final int offset, final int maxcount, final long maxtime); public BlockingQueue<String> concurrentIDsByQuery(
final String querystring,
final int offset,
final int maxcount,
final long maxtime,
final int buffersize,
final int concurrency);
} }

@ -2292,7 +2292,6 @@ public final class Switchboard extends serverSwitch {
// execute the (post-) processing steps for all entries that have a process tag assigned // execute the (post-) processing steps for all entries that have a process tag assigned
Fulltext fulltext = index.fulltext(); Fulltext fulltext = index.fulltext();
CollectionConfiguration collection1Configuration = fulltext.getDefaultConfiguration(); CollectionConfiguration collection1Configuration = fulltext.getDefaultConfiguration();
WebgraphConfiguration webgraphConfiguration = fulltext.getWebgraphConfiguration();
if (!this.crawlJobIsPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL) && MemoryControl.available() > 512L * 1024L * 1024L && Memory.load() < 2.5f) { if (!this.crawlJobIsPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL) && MemoryControl.available() > 512L * 1024L * 1024L && Memory.load() < 2.5f) {
// we optimize first because that is useful for postprocessing // we optimize first because that is useful for postprocessing
@ -2302,10 +2301,9 @@ public final class Switchboard extends serverSwitch {
Set<String> deletionCandidates = collection1Configuration.contains(CollectionSchema.harvestkey_s.getSolrFieldName()) ? Set<String> deletionCandidates = collection1Configuration.contains(CollectionSchema.harvestkey_s.getSolrFieldName()) ?
this.crawler.getFinishesProfiles(this.crawlQueues) : new HashSet<String>(); this.crawler.getFinishesProfiles(this.crawlQueues) : new HashSet<String>();
int cleanupByHarvestkey = deletionCandidates.size(); int cleanupByHarvestkey = deletionCandidates.size();
boolean processCollection = collection1Configuration.contains(CollectionSchema.process_sxt) && (index.connectedCitation() || fulltext.useWebgraph()); boolean postprocessing = collection1Configuration.contains(CollectionSchema.process_sxt) && (index.connectedCitation() || fulltext.useWebgraph());
boolean processWebgraph = webgraphConfiguration.contains(WebgraphSchema.process_sxt) && fulltext.useWebgraph();
boolean allCrawlsFinished = this.crawler.allCrawlsFinished(this.crawlQueues); boolean allCrawlsFinished = this.crawler.allCrawlsFinished(this.crawlQueues);
if ((processCollection || processWebgraph) && (cleanupByHarvestkey > 0 || allCrawlsFinished)) { if (postprocessing && (cleanupByHarvestkey > 0 || allCrawlsFinished)) {
if (cleanupByHarvestkey > 0) { if (cleanupByHarvestkey > 0) {
// run postprocessing on these profiles // run postprocessing on these profiles
postprocessingRunning = true; postprocessingRunning = true;
@ -2315,13 +2313,6 @@ public final class Switchboard extends serverSwitch {
postprocessingStartTime[0] = 0; postprocessingStartTime[0] = 0;
try {postprocessingCount[0] = (int) fulltext.getDefaultConnector().getCountByQuery(CollectionSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM);} catch (IOException e) {} // should be zero but you never know try {postprocessingCount[0] = (int) fulltext.getDefaultConnector().getCountByQuery(CollectionSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM);} catch (IOException e) {} // should be zero but you never know
if (processWebgraph) {
postprocessingStartTime[1] = System.currentTimeMillis();
try {postprocessingCount[1] = (int) fulltext.getWebgraphConnector().getCountByQuery(WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM);} catch (IOException e) {}
for (String profileHash: deletionCandidates) proccount += webgraphConfiguration.postprocessing(index, clickdepthCache, profileHash);
postprocessingStartTime[1] = 0;
try {postprocessingCount[1] = (int) fulltext.getWebgraphConnector().getCountByQuery(WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM);} catch (IOException e) {}
}
this.crawler.cleanProfiles(deletionCandidates); this.crawler.cleanProfiles(deletionCandidates);
log.info("cleanup removed " + cleanupByHarvestkey + " crawl profiles, post-processed " + proccount + " documents"); log.info("cleanup removed " + cleanupByHarvestkey + " crawl profiles, post-processed " + proccount + " documents");
} else if (allCrawlsFinished) { } else if (allCrawlsFinished) {
@ -2333,13 +2324,6 @@ public final class Switchboard extends serverSwitch {
postprocessingStartTime[0] = 0; postprocessingStartTime[0] = 0;
try {postprocessingCount[0] = (int) fulltext.getDefaultConnector().getCountByQuery(CollectionSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM);} catch (IOException e) {} // should be zero but you never know try {postprocessingCount[0] = (int) fulltext.getDefaultConnector().getCountByQuery(CollectionSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM);} catch (IOException e) {} // should be zero but you never know
if (processWebgraph) {
postprocessingStartTime[1] = System.currentTimeMillis();
try {postprocessingCount[1] = (int) fulltext.getWebgraphConnector().getCountByQuery(WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM);} catch (IOException e) {}
proccount += webgraphConfiguration.postprocessing(index, clickdepthCache, null);
postprocessingStartTime[1] = 0;
try {postprocessingCount[1] = (int) fulltext.getWebgraphConnector().getCountByQuery(WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM);} catch (IOException e) {}
}
this.crawler.cleanProfiles(this.crawler.getActiveProfiles()); this.crawler.cleanProfiles(this.crawler.getActiveProfiles());
log.info("cleanup post-processed " + proccount + " documents"); log.info("cleanup post-processed " + proccount + " documents");
} }

@ -32,6 +32,7 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -426,16 +427,18 @@ public final class Fulltext {
final String collectionQuery = CollectionSchema.host_s.getSolrFieldName() + ":\"" + host + "\"" + final String collectionQuery = CollectionSchema.host_s.getSolrFieldName() + ":\"" + host + "\"" +
((freshdate != null && freshdate.before(new Date())) ? (" AND " + CollectionSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") : ""); ((freshdate != null && freshdate.before(new Date())) ? (" AND " + CollectionSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") : "");
final AtomicInteger count = new AtomicInteger(0); final AtomicInteger count = new AtomicInteger(0);
final BlockingQueue<SolrDocument> docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(collectionQuery, 0, 1000000, 600000, -1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); final BlockingQueue<SolrDocument> docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(collectionQuery, 0, 1000000, 600000, 100, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName());
try { try {
Set<String> deleteIDs = new HashSet<String>();
SolrDocument doc; SolrDocument doc;
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
String u = (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName()); String u = (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName());
if (u.startsWith(basepath)) { if (u.startsWith(basepath)) {
remove(ASCII.getBytes((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))); deleteIDs.add((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()));
count.incrementAndGet(); count.incrementAndGet();
} }
} }
remove(deleteIDs);
if (count.get() > 0) Fulltext.this.commit(true); if (count.get() > 0) Fulltext.this.commit(true);
} catch (final InterruptedException e) {} } catch (final InterruptedException e) {}
return count.get(); return count.get();
@ -660,7 +663,7 @@ public final class Fulltext {
this.count++; this.count++;
} }
} else { } else {
BlockingQueue<SolrDocument> docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(this.query + " AND " + CollectionSchema.httpstatus_i.getSolrFieldName() + ":200", 0, 100000000, 10 * 60 * 60 * 1000, 100, BlockingQueue<SolrDocument> docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(this.query + " AND " + CollectionSchema.httpstatus_i.getSolrFieldName() + ":200", 0, 100000000, 10 * 60 * 60 * 1000, 100, 1,
CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.title.getSolrFieldName(), CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.title.getSolrFieldName(),
CollectionSchema.author.getSolrFieldName(), CollectionSchema.description_txt.getSolrFieldName(), CollectionSchema.size_i.getSolrFieldName(), CollectionSchema.last_modified.getSolrFieldName()); CollectionSchema.author.getSolrFieldName(), CollectionSchema.description_txt.getSolrFieldName(), CollectionSchema.size_i.getSolrFieldName(), CollectionSchema.last_modified.getSolrFieldName());
SolrDocument doc; SolrDocument doc;

@ -30,13 +30,13 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocument;
@ -287,7 +287,7 @@ public class Segment {
public class ReferenceReportCache { public class ReferenceReportCache {
private final Map<String, ReferenceReport> cache; private final Map<String, ReferenceReport> cache;
public ReferenceReportCache() { public ReferenceReportCache() {
this.cache = new HashMap<String, ReferenceReport>(); this.cache = new ConcurrentHashMap<String, ReferenceReport>();
} }
public ReferenceReport getReferenceReport(final String id, final boolean acceptSelfReference) throws IOException { public ReferenceReport getReferenceReport(final String id, final boolean acceptSelfReference) throws IOException {
ReferenceReport rr = cache.get(id); ReferenceReport rr = cache.get(id);
@ -309,11 +309,11 @@ public class Segment {
} }
public class ClickdepthCache { public class ClickdepthCache {
ReferenceReportCache rrc; final ReferenceReportCache rrc;
Map<String, Integer> cache; final Map<String, Integer> cache;
public ClickdepthCache(ReferenceReportCache rrc) { public ClickdepthCache(ReferenceReportCache rrc) {
this.rrc = rrc; this.rrc = rrc;
this.cache = new HashMap<String, Integer>(); this.cache = new ConcurrentHashMap<String, Integer>();
} }
public int getClickdepth(final DigestURL url, int maxtime) throws IOException { public int getClickdepth(final DigestURL url, int maxtime) throws IOException {
Integer clickdepth = cache.get(ASCII.String(url.hash())); Integer clickdepth = cache.get(ASCII.String(url.hash()));
@ -371,7 +371,7 @@ public class Segment {
if ((internalIDs.size() == 0 || !connectedCitation()) && Segment.this.fulltext.useWebgraph()) { if ((internalIDs.size() == 0 || !connectedCitation()) && Segment.this.fulltext.useWebgraph()) {
// reqd the references from the webgraph // reqd the references from the webgraph
SolrConnector webgraph = Segment.this.fulltext.getWebgraphConnector(); SolrConnector webgraph = Segment.this.fulltext.getWebgraphConnector();
BlockingQueue<SolrDocument> docs = webgraph.concurrentDocumentsByQuery("{!raw f=" + WebgraphSchema.target_id_s.getSolrFieldName() + "}" + ASCII.String(id), 0, 10000000, 1000, 100, WebgraphSchema.source_id_s.getSolrFieldName()); BlockingQueue<SolrDocument> docs = webgraph.concurrentDocumentsByQuery("{!raw f=" + WebgraphSchema.target_id_s.getSolrFieldName() + "}" + ASCII.String(id), 0, 10000000, 1000, 100, 1, WebgraphSchema.source_id_s.getSolrFieldName());
SolrDocument doc; SolrDocument doc;
try { try {
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
@ -474,12 +474,12 @@ public class Segment {
final BlockingQueue<SolrDocument> docQueue; final BlockingQueue<SolrDocument> docQueue;
final String urlstub; final String urlstub;
if (stub == null) { if (stub == null) {
docQueue = this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(AbstractSolrConnector.CATCHALL_QUERY, 0, Integer.MAX_VALUE, maxtime, maxcount, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); docQueue = this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(AbstractSolrConnector.CATCHALL_QUERY, 0, Integer.MAX_VALUE, maxtime, maxcount, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName());
urlstub = null; urlstub = null;
} else { } else {
final String host = stub.getHost(); final String host = stub.getHost();
String hh = DigestURL.hosthash(host); String hh = DigestURL.hosthash(host);
docQueue = this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(CollectionSchema.host_id_s + ":\"" + hh + "\"", 0, Integer.MAX_VALUE, maxtime, maxcount, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); docQueue = this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(CollectionSchema.host_id_s + ":\"" + hh + "\"", 0, Integer.MAX_VALUE, maxtime, maxcount, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName());
urlstub = stub.toNormalform(true); urlstub = stub.toNormalform(true);
} }

@ -42,6 +42,8 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import net.yacy.cora.document.analysis.EnhancedTextProfileSignature; import net.yacy.cora.document.analysis.EnhancedTextProfileSignature;
@ -913,14 +915,14 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
* @param urlCitation * @param urlCitation
* @return * @return
*/ */
public int postprocessing(final Segment segment, ReferenceReportCache rrCache, ClickdepthCache clickdepthCache, String harvestkey) { public int postprocessing(final Segment segment, final ReferenceReportCache rrCache, final ClickdepthCache clickdepthCache, final String harvestkey) {
if (!this.contains(CollectionSchema.process_sxt)) return 0; if (!this.contains(CollectionSchema.process_sxt)) return 0;
if (!segment.connectedCitation() && !segment.fulltext().useWebgraph()) return 0; if (!segment.connectedCitation() && !segment.fulltext().useWebgraph()) return 0;
SolrConnector collectionConnector = segment.fulltext().getDefaultConnector(); final SolrConnector collectionConnector = segment.fulltext().getDefaultConnector();
collectionConnector.commit(false); // make sure that we have latest information that can be found collectionConnector.commit(false); // make sure that we have latest information that can be found
if (segment.fulltext().useWebgraph()) segment.fulltext().getWebgraphConnector().commit(false); if (segment.fulltext().useWebgraph()) segment.fulltext().getWebgraphConnector().commit(false);
CollectionConfiguration collection = segment.fulltext().getDefaultConfiguration(); final CollectionConfiguration collection = segment.fulltext().getDefaultConfiguration();
WebgraphConfiguration webgraph = segment.fulltext().getWebgraphConfiguration(); final WebgraphConfiguration webgraph = segment.fulltext().getWebgraphConfiguration();
// collect hosts from index which shall take part in citation computation // collect hosts from index which shall take part in citation computation
@ -936,15 +938,15 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
} }
// create the ranking map // create the ranking map
Map<String, CRV> rankings = null; final Map<String, CRV> rankings = new ConcurrentHashMap<String, CRV>();
if ((segment.fulltext().useWebgraph() && if ((segment.fulltext().useWebgraph() &&
((webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) || ((webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) ||
(webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i))) || (webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i))) ||
(collection.contains(CollectionSchema.cr_host_count_i) && (collection.contains(CollectionSchema.cr_host_count_i) &&
collection.contains(CollectionSchema.cr_host_chance_d) && collection.contains(CollectionSchema.cr_host_chance_d) &&
collection.contains(CollectionSchema.cr_host_norm_i)))) try { collection.contains(CollectionSchema.cr_host_norm_i)))) try {
ConcurrentLog.info("CollectionConfiguration", "collecting " + hostscore.size() + " hosts"); int concurrency = Math.min(hostscore.size(), Runtime.getRuntime().availableProcessors());
rankings = new HashMap<String, CRV>(); ConcurrentLog.info("CollectionConfiguration", "collecting " + hostscore.size() + " hosts, concrrency = " + concurrency);
int countcheck = 0; int countcheck = 0;
for (String host: hostscore.keyList(true)) { for (String host: hostscore.keyList(true)) {
// Patch the citation index for links with canonical tags. // Patch the citation index for links with canonical tags.
@ -953,7 +955,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
// To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links // To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links
String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM;
long patchquerycount = collectionConnector.getCountByQuery(patchquery); long patchquerycount = collectionConnector.getCountByQuery(patchquery);
BlockingQueue<SolrDocument> documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, 0, 10000000, 600000, 100, BlockingQueue<SolrDocument> documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, 0, 10000000, 600000, 200, 1,
CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName()); CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName());
SolrDocument doc_B; SolrDocument doc_B;
int patchquerycountcheck = 0; int patchquerycountcheck = 0;
@ -1020,63 +1022,111 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
} }
// process all documents at the webgraph for the outgoing links of this document // process all documents at the webgraph for the outgoing links of this document
SolrDocument doc; final AtomicInteger allcount = new AtomicInteger(0);
int allcount = 0;
if (segment.fulltext().useWebgraph()) { if (segment.fulltext().useWebgraph()) {
Set<String> omitFields = new HashSet<String>(); final Set<String> omitFields = new HashSet<String>();
omitFields.add(WebgraphSchema.process_sxt.getSolrFieldName()); omitFields.add(WebgraphSchema.process_sxt.getSolrFieldName());
omitFields.add(WebgraphSchema.harvestkey_s.getSolrFieldName()); omitFields.add(WebgraphSchema.harvestkey_s.getSolrFieldName());
try { try {
int proccount = 0; final long start = System.currentTimeMillis();
long start = System.currentTimeMillis();
for (String host: hostscore.keyList(true)) { for (String host: hostscore.keyList(true)) {
if (hostscore.get(host) <= 0) continue; if (hostscore.get(host) <= 0) continue;
final String hostfinal = host;
// select all webgraph edges and modify their cr value // select all webgraph edges and modify their cr value
query = WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + host + "\" AND " + WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; query = WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + host + "\" AND " + WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM;
long count = segment.fulltext().getWebgraphConnector().getCountByQuery(query); final long count = segment.fulltext().getWebgraphConnector().getCountByQuery(query);
ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph"); int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4));
BlockingQueue<SolrDocument> docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(query, 0, 10000000, 1800000, 100); ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency);
int countcheck = 0; final BlockingQueue<SolrDocument> docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(query, 0, 10000000, 1800000, 200, concurrency);
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { final AtomicInteger proccount = new AtomicInteger(0);
SolrInputDocument sid = webgraph.toSolrInputDocument(doc, omitFields); Thread[] t = new Thread[concurrency];
if (webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) { for (final AtomicInteger i = new AtomicInteger(0); i.get() < t.length; i.incrementAndGet()) {
String id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName()); t[i.get()] = new Thread() {
CRV crv = rankings.get(id); private String name = "CollectionConfiguration.postprocessing.webgraph-" + i.get();
if (crv != null) { public void run() {
sid.setField(WebgraphSchema.source_cr_host_norm_i.getSolrFieldName(), crv.crn); Thread.currentThread().setName(name);
SolrDocument doc; String protocol, urlstub, id; DigestURL url;
try {
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
SolrInputDocument sid = webgraph.toSolrInputDocument(doc, omitFields);
Collection<Object> proctags = doc.getFieldValues(WebgraphSchema.process_sxt.getSolrFieldName());
Set<ProcessType> process = new HashSet<ProcessType>();
for (Object tag: proctags) {
ProcessType tagtype = ProcessType.valueOf((String) tag);
process.add(tagtype);
}
// set cr values
if (webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) {
id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName());
CRV crv = rankings.get(id);
if (crv != null) {
sid.setField(WebgraphSchema.source_cr_host_norm_i.getSolrFieldName(), crv.crn);
}
}
if (webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i)) {
id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName());
CRV crv = rankings.get(id);
if (crv != null) {
sid.setField(WebgraphSchema.target_cr_host_norm_i.getSolrFieldName(), crv.crn);
}
}
// set clickdepth
if (process.contains(ProcessType.CLICKDEPTH)) {
if (webgraph.contains(WebgraphSchema.source_clickdepth_i) && webgraph.contains(WebgraphSchema.source_protocol_s) && webgraph.contains(WebgraphSchema.source_urlstub_s) && webgraph.contains(WebgraphSchema.source_id_s)) {
protocol = (String) doc.getFieldValue(WebgraphSchema.source_protocol_s.getSolrFieldName());
urlstub = (String) doc.getFieldValue(WebgraphSchema.source_urlstub_s.getSolrFieldName());
id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName());
try {
url = new DigestURL(protocol + "://" + urlstub, ASCII.getBytes(id));
postprocessing_clickdepth(clickdepthCache, sid, url, WebgraphSchema.source_clickdepth_i, 100);
} catch (MalformedURLException e) {
}
}
if (webgraph.contains(WebgraphSchema.target_clickdepth_i) && webgraph.contains(WebgraphSchema.target_protocol_s) && webgraph.contains(WebgraphSchema.target_urlstub_s) && webgraph.contains(WebgraphSchema.target_id_s)) {
protocol = (String) doc.getFieldValue(WebgraphSchema.target_protocol_s.getSolrFieldName());
urlstub = (String) doc.getFieldValue(WebgraphSchema.target_urlstub_s.getSolrFieldName());
id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName());
try {
url = new DigestURL(protocol + "://" + urlstub, ASCII.getBytes(id));
postprocessing_clickdepth(clickdepthCache, sid, url, WebgraphSchema.target_clickdepth_i, 100);
} catch (MalformedURLException e) {
}
}
}
// write document back to index
try {
sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName());
sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName());
segment.fulltext().getWebgraphConnector().deleteById((String) sid.getFieldValue(WebgraphSchema.id.getSolrFieldName()));
segment.fulltext().getWebgraphConnector().add(sid);
} catch (SolrException e) {
ConcurrentLog.logException(e);
} catch (IOException e) {
ConcurrentLog.logException(e);
}
proccount.incrementAndGet();
allcount.incrementAndGet();
if (proccount.get() % 1000 == 0) ConcurrentLog.info(
"CollectionConfiguration", "webgraph - postprocessed " + proccount + " from " + count + " documents; " +
(proccount.get() * 1000 / (System.currentTimeMillis() - start)) + " docs/second; " +
((System.currentTimeMillis() - start) * (count - proccount.get()) / proccount.get() / 60000) + " minutes remaining for host " + hostfinal);
}
} catch (InterruptedException e) {
ConcurrentLog.warn("CollectionConfiguration", e.getMessage(), e);
}
} }
} };
if (webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i)) { t[i.get()].start();
String id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName());
CRV crv = rankings.get(id);
if (crv != null) {
sid.setField(WebgraphSchema.target_cr_host_norm_i.getSolrFieldName(), crv.crn);
}
}
try {
sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName());
sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName());
segment.fulltext().getWebgraphConnector().deleteById((String) sid.getFieldValue(WebgraphSchema.id.getSolrFieldName()));
segment.fulltext().getWebgraphConnector().add(sid);
} catch (SolrException e) {
ConcurrentLog.logException(e);
} catch (IOException e) {
ConcurrentLog.logException(e);
}
countcheck++;
proccount++; allcount++;
if (proccount % 1000 == 0) ConcurrentLog.info(
"CollectionConfiguration", "webgraph - postprocessed " + proccount + " from " + count + " documents; " +
(proccount * 1000 / (System.currentTimeMillis() - start)) + " docs/second; " +
((System.currentTimeMillis() - start) * (count - proccount) / proccount / 60000) + " minutes remaining");
} }
for (int i = 0; i < t.length; i++) try {t[i].join();} catch (InterruptedException e) {}
if (count != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous webgraph document count for host " + host + ": expected=" + count + ", counted=" + countcheck); if (count != proccount.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous webgraph document count for host " + host + ": expected=" + count + ", counted=" + proccount);
} }
} catch (final IOException e2) { } catch (final IOException e2) {
ConcurrentLog.warn("CollectionConfiguration", e2.getMessage(), e2); ConcurrentLog.warn("CollectionConfiguration", e2.getMessage(), e2);
} catch (final InterruptedException e3) {
ConcurrentLog.warn("CollectionConfiguration", e3.getMessage(), e3);
} }
} }
@ -1093,9 +1143,10 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
long count = collectionConnector.getCountByQuery(query); long count = collectionConnector.getCountByQuery(query);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the collection for harvestkey " + harvestkey); ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the collection for harvestkey " + harvestkey);
BlockingQueue<SolrDocument> docs = collectionConnector.concurrentDocumentsByQuery(query, 0, 10000000, 1800000, 100); BlockingQueue<SolrDocument> docs = collectionConnector.concurrentDocumentsByQuery(query, 0, 10000000, 1800000, 200, 1);
int countcheck = 0; int countcheck = 0;
Collection<String> failids = new ArrayList<String>(); Collection<String> failids = new ArrayList<String>();
SolrDocument doc;
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
// for each to-be-processed entry work on the process tag // for each to-be-processed entry work on the process tag
Collection<Object> proctags = doc.getFieldValues(CollectionSchema.process_sxt.getSolrFieldName()); Collection<Object> proctags = doc.getFieldValues(CollectionSchema.process_sxt.getSolrFieldName());
@ -1118,7 +1169,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
collection.contains(CollectionSchema.cr_host_count_i) && collection.contains(CollectionSchema.cr_host_count_i) &&
collection.contains(CollectionSchema.cr_host_chance_d) && collection.contains(CollectionSchema.cr_host_chance_d) &&
collection.contains(CollectionSchema.cr_host_norm_i)) { collection.contains(CollectionSchema.cr_host_norm_i)) {
CRV crv = rankings.get(ASCII.String(id)); CRV crv = rankings.remove(ASCII.String(id)); // instead of 'get'ting the CRV, we also remove it because we will not need it again and free some memory here
if (crv != null) { if (crv != null) {
sid.setField(CollectionSchema.cr_host_count_i.getSolrFieldName(), crv.count); sid.setField(CollectionSchema.cr_host_count_i.getSolrFieldName(), crv.count);
sid.setField(CollectionSchema.cr_host_chance_d.getSolrFieldName(), crv.cr); sid.setField(CollectionSchema.cr_host_chance_d.getSolrFieldName(), crv.cr);
@ -1151,7 +1202,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
collectionConnector.deleteById(i); collectionConnector.deleteById(i);
collectionConnector.add(sid); collectionConnector.add(sid);
proccount++; allcount++; proccount++; allcount.incrementAndGet();
if (proccount % 100 == 0) ConcurrentLog.info( if (proccount % 100 == 0) ConcurrentLog.info(
"CollectionConfiguration", "collection - postprocessed " + proccount + " from " + count + " documents; " + "CollectionConfiguration", "collection - postprocessed " + proccount + " from " + count + " documents; " +
(proccount * 1000 / (System.currentTimeMillis() - start)) + " docs/second; " + (proccount * 1000 / (System.currentTimeMillis() - start)) + " docs/second; " +
@ -1177,7 +1228,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
} catch (IOException e3) { } catch (IOException e3) {
ConcurrentLog.warn("CollectionConfiguration", e3.getMessage(), e3); ConcurrentLog.warn("CollectionConfiguration", e3.getMessage(), e3);
} }
return allcount; return allcount.get();
} }
private static final class CRV { private static final class CRV {
@ -1211,10 +1262,10 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
this.rrCache = rrCache; this.rrCache = rrCache;
this.converge_eq_factor = (int) Math.pow(10.0d, converge_digits); this.converge_eq_factor = (int) Math.pow(10.0d, converge_digits);
SolrConnector connector = segment.fulltext().getDefaultConnector(); SolrConnector connector = segment.fulltext().getDefaultConnector();
this.crt = new HashMap<String, double[]>(); this.crt = new ConcurrentHashMap<String, double[]>();
try { try {
// select all documents for each host // select all documents for each host
BlockingQueue<String> ids = connector.concurrentIDsByQuery("{!raw f=" + CollectionSchema.host_s.getSolrFieldName() + "}" + host, 0, 10000000, 600000); BlockingQueue<String> ids = connector.concurrentIDsByQuery("{!raw f=" + CollectionSchema.host_s.getSolrFieldName() + "}" + host, 0, 10000000, 600000, 200, 1);
String id; String id;
while ((id = ids.take()) != AbstractSolrConnector.POISON_ID) { while ((id = ids.take()) != AbstractSolrConnector.POISON_ID) {
this.crt.put(id, new double[]{0.0d,0.0d}); //{old value, new value} this.crt.put(id, new double[]{0.0d,0.0d}); //{old value, new value}

@ -35,7 +35,6 @@ import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocument;
@ -48,15 +47,11 @@ import net.yacy.cora.document.id.MultiProtocolURL;
import net.yacy.cora.federate.solr.ProcessType; import net.yacy.cora.federate.solr.ProcessType;
import net.yacy.cora.federate.solr.SchemaConfiguration; import net.yacy.cora.federate.solr.SchemaConfiguration;
import net.yacy.cora.federate.solr.SchemaDeclaration; import net.yacy.cora.federate.solr.SchemaDeclaration;
import net.yacy.cora.federate.solr.connector.AbstractSolrConnector;
import net.yacy.cora.federate.solr.connector.SolrConnector;
import net.yacy.cora.protocol.Domains; import net.yacy.cora.protocol.Domains;
import net.yacy.cora.protocol.ResponseHeader; import net.yacy.cora.protocol.ResponseHeader;
import net.yacy.cora.util.CommonPattern; import net.yacy.cora.util.CommonPattern;
import net.yacy.cora.util.ConcurrentLog; import net.yacy.cora.util.ConcurrentLog;
import net.yacy.document.parser.html.ImageEntry; import net.yacy.document.parser.html.ImageEntry;
import net.yacy.search.index.Segment;
import net.yacy.search.index.Segment.ClickdepthCache;
public class WebgraphConfiguration extends SchemaConfiguration implements Serializable { public class WebgraphConfiguration extends SchemaConfiguration implements Serializable {
@ -306,74 +301,6 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial
// return the edge // return the edge
return edge; return edge;
} }
public int postprocessing(final Segment segment, ClickdepthCache clickdepthCache, final String harvestkey) {
if (!this.contains(WebgraphSchema.process_sxt)) return 0;
if (!segment.fulltext().useWebgraph()) return 0;
SolrConnector webgraphConnector = segment.fulltext().getWebgraphConnector();
// that means we must search for those entries.
webgraphConnector.commit(true); // make sure that we have latest information that can be found
//BlockingQueue<SolrDocument> docs = index.fulltext().getSolr().concurrentQuery("*:*", 0, 1000, 60000, 10);
String query = (harvestkey == null || !this.contains(WebgraphSchema.harvestkey_s) ? "" : WebgraphSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") + WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM;
BlockingQueue<SolrDocument> docs = webgraphConnector.concurrentDocumentsByQuery(query, 0, 10000000, 1800000, 100);
SolrDocument doc;
String protocol, urlstub, id;
DigestURL url;
int proccount = 0, proccount_clickdepthchange = 0;
try {
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
// for each to-be-processed entry work on the process tag
Collection<Object> proctags = doc.getFieldValues(WebgraphSchema.process_sxt.getSolrFieldName());
try {
SolrInputDocument sid = this.toSolrInputDocument(doc);
//boolean changed = false;
for (Object tag: proctags) {
// switch over tag types
ProcessType tagtype = ProcessType.valueOf((String) tag);
if (tagtype == ProcessType.CLICKDEPTH) {
if (this.contains(WebgraphSchema.source_clickdepth_i) && this.contains(WebgraphSchema.source_protocol_s) && this.contains(WebgraphSchema.source_urlstub_s) && this.contains(WebgraphSchema.source_id_s)) {
protocol = (String) doc.getFieldValue(WebgraphSchema.source_protocol_s.getSolrFieldName());
urlstub = (String) doc.getFieldValue(WebgraphSchema.source_urlstub_s.getSolrFieldName());
id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName());
url = new DigestURL(protocol + "://" + urlstub, ASCII.getBytes(id));
if (postprocessing_clickdepth(clickdepthCache, sid, url, WebgraphSchema.source_clickdepth_i, 100)) {
proccount_clickdepthchange++;
//changed = true;
}
//ConcurrentLog.info("WebgraphConfiguration", "postprocessing webgraph source id " + id + ", url=" + protocol + "://" + urlstub + ", result: " + (changed ? "changed" : "not changed"));
}
if (this.contains(WebgraphSchema.target_clickdepth_i) && this.contains(WebgraphSchema.target_protocol_s) && this.contains(WebgraphSchema.target_urlstub_s) && this.contains(WebgraphSchema.target_id_s)) {
protocol = (String) doc.getFieldValue(WebgraphSchema.target_protocol_s.getSolrFieldName());
urlstub = (String) doc.getFieldValue(WebgraphSchema.target_urlstub_s.getSolrFieldName());
id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName());
url = new DigestURL(protocol + "://" + urlstub, ASCII.getBytes(id));
if (postprocessing_clickdepth(clickdepthCache, sid, url, WebgraphSchema.target_clickdepth_i, 100)) {
proccount_clickdepthchange++;
//changed = true;
}
//ConcurrentLog.info("WebgraphConfiguration", "postprocessing webgraph target id " + id + ", url=" + protocol + "://" + urlstub + ", result: " + (changed ? "changed" : "not changed"));
}
}
}
// all processing steps checked, remove the processing tag
sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName());
if (this.contains(WebgraphSchema.harvestkey_s)) sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName());
// send back to index
webgraphConnector.add(sid);
proccount++;
} catch (Throwable e1) {
ConcurrentLog.warn(WebgraphConfiguration.class.getName(), "postprocessing failed", e1);
}
}
ConcurrentLog.info("WebgraphConfiguration", "cleanup_processing: re-calculated " + proccount + " new documents, " + proccount_clickdepthchange + " clickdepth values changed.");
} catch (final InterruptedException e) {
}
return proccount;
}
/** /**
* encode a string containing attributes from anchor rel properties binary: * encode a string containing attributes from anchor rel properties binary:

Loading…
Cancel
Save