From 51800007c4265eda564883b7fc48ca86de811d48 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Thu, 6 Mar 2014 01:43:48 +0100 Subject: [PATCH] - added concurrency to postprocessing of webgraph document - bundeled separate webgraph postprocesing steps into one --- htroot/HostBrowser.java | 2 +- htroot/IndexDeletion_p.java | 2 +- .../federate/solr/SchemaConfiguration.java | 4 +- .../solr/connector/AbstractSolrConnector.java | 34 +++- .../ConcurrentUpdateSolrConnector.java | 8 +- .../solr/connector/EmbeddedSolrConnector.java | 5 +- .../solr/connector/MirrorSolrConnector.java | 8 +- .../solr/connector/SolrConnector.java | 20 ++- source/net/yacy/search/Switchboard.java | 20 +-- source/net/yacy/search/index/Fulltext.java | 9 +- source/net/yacy/search/index/Segment.java | 16 +- .../schema/CollectionConfiguration.java | 165 ++++++++++++------ .../search/schema/WebgraphConfiguration.java | 73 -------- 13 files changed, 183 insertions(+), 183 deletions(-) diff --git a/htroot/HostBrowser.java b/htroot/HostBrowser.java index 0f730cbfc..c4ff62fad 100644 --- a/htroot/HostBrowser.java +++ b/htroot/HostBrowser.java @@ -272,7 +272,7 @@ public class HostBrowser { q.append(" AND ").append(CollectionSchema.url_paths_sxt.getSolrFieldName()).append(AbstractSolrConnector.CATCHALL_DTERM); } } - BlockingQueue docs = fulltext.getDefaultConnector().concurrentDocumentsByQuery(q.toString(), 0, 100000, TIMEOUT, 100, + BlockingQueue docs = fulltext.getDefaultConnector().concurrentDocumentsByQuery(q.toString(), 0, 100000, TIMEOUT, 100, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.failreason_s.getSolrFieldName(), diff --git a/htroot/IndexDeletion_p.java b/htroot/IndexDeletion_p.java index 40ddcaeda..e569e8116 100644 --- a/htroot/IndexDeletion_p.java +++ b/htroot/IndexDeletion_p.java @@ -130,7 +130,7 @@ public class IndexDeletion_p { } try { DigestURL u = new DigestURL(urlStub); - BlockingQueue dq = defaultConnector.concurrentDocumentsByQuery(CollectionSchema.host_s.getSolrFieldName() + ":\"" + u.getHost() + "\"", 0, 100000000, Long.MAX_VALUE, 100, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); + BlockingQueue dq = defaultConnector.concurrentDocumentsByQuery(CollectionSchema.host_s.getSolrFieldName() + ":\"" + u.getHost() + "\"", 0, 100000000, Long.MAX_VALUE, 100, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); SolrDocument doc; try { while ((doc = dq.take()) != AbstractSolrConnector.POISON_DOCUMENT) { diff --git a/source/net/yacy/cora/federate/solr/SchemaConfiguration.java b/source/net/yacy/cora/federate/solr/SchemaConfiguration.java index 47a5f63c2..fc9bf0472 100644 --- a/source/net/yacy/cora/federate/solr/SchemaConfiguration.java +++ b/source/net/yacy/cora/federate/solr/SchemaConfiguration.java @@ -178,7 +178,7 @@ public class SchemaConfiguration extends Configuration implements Serializable { return changed; } - public boolean postprocessing_clickdepth(ClickdepthCache clickdepthCache, SolrInputDocument sid, DigestURL url, SchemaDeclaration clickdepthfield, int maxtime) { + public boolean postprocessing_clickdepth(final ClickdepthCache clickdepthCache, final SolrInputDocument sid, final DigestURL url, final SchemaDeclaration clickdepthfield, final int maxtime) { if (!this.contains(clickdepthfield)) return false; // get new click depth and compare with old Integer oldclickdepth = (Integer) sid.getFieldValue(clickdepthfield.getSolrFieldName()); @@ -194,7 +194,7 @@ public class SchemaConfiguration extends Configuration implements Serializable { return false; } - public boolean postprocessing_references(ReferenceReportCache rrCache, SolrInputDocument sid, DigestURL url, Map hostExtentCount) { + public boolean postprocessing_references(final ReferenceReportCache rrCache, final SolrInputDocument sid, final DigestURL url, final Map hostExtentCount) { if (!(this.contains(CollectionSchema.references_i) || this.contains(CollectionSchema.references_internal_i) || this.contains(CollectionSchema.references_external_i) || this.contains(CollectionSchema.references_exthosts_i))) return false; diff --git a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java index cca65fc23..da013f29f 100644 --- a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java @@ -138,10 +138,18 @@ public abstract class AbstractSolrConnector implements SolrConnector { * @param maxcount the maximum number of results * @param maxtime the maximum time in milliseconds * @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used + * @param concurrency is the number of AbstractSolrConnector.POISON_DOCUMENT entries to add at the end of the feed * @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element */ @Override - public BlockingQueue concurrentDocumentsByQuery(final String querystring, final int offset, final int maxcount, final long maxtime, final int buffersize, final String ... fields) { + public BlockingQueue concurrentDocumentsByQuery( + final String querystring, + final int offset, + final int maxcount, + final long maxtime, + final int buffersize, + final int concurrency, + final String ... fields) { final BlockingQueue queue = buffersize <= 0 ? new LinkedBlockingQueue() : new ArrayBlockingQueue(buffersize); final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! final Thread t = new Thread() { @@ -157,7 +165,7 @@ public abstract class AbstractSolrConnector implements SolrConnector { try {queue.put(d);} catch (final InterruptedException e) {break;} count++; } - if (sdl.size() <= 0) break; + if (sdl.size() < pagesize) break; o += sdl.size(); } catch (final SolrException e) { break; @@ -165,7 +173,9 @@ public abstract class AbstractSolrConnector implements SolrConnector { break; } } - try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {} + for (int i = 0; i < concurrency; i++) { + try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {} + } } }; t.start(); @@ -173,8 +183,14 @@ public abstract class AbstractSolrConnector implements SolrConnector { } @Override - public BlockingQueue concurrentIDsByQuery(final String querystring, final int offset, final int maxcount, final long maxtime) { - final BlockingQueue queue = new LinkedBlockingQueue(); + public BlockingQueue concurrentIDsByQuery( + final String querystring, + final int offset, + final int maxcount, + final long maxtime, + final int buffersize, + final int concurrency) { + final BlockingQueue queue = buffersize <= 0 ? new LinkedBlockingQueue() : new ArrayBlockingQueue(buffersize); final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! final Thread t = new Thread() { @Override @@ -187,7 +203,7 @@ public abstract class AbstractSolrConnector implements SolrConnector { for (SolrDocument d: sdl) { try {queue.put((String) d.getFieldValue(CollectionSchema.id.getSolrFieldName()));} catch (final InterruptedException e) {break;} } - if (sdl.size() <= 0) break; + if (sdl.size() < pagesize) break; o += sdl.size(); } catch (final SolrException e) { break; @@ -195,7 +211,9 @@ public abstract class AbstractSolrConnector implements SolrConnector { break; } } - try {queue.put(AbstractSolrConnector.POISON_ID);} catch (final InterruptedException e1) {} + for (int i = 0; i < concurrency; i++) { + try {queue.put(AbstractSolrConnector.POISON_ID);} catch (final InterruptedException e1) {} + } } }; t.start(); @@ -204,7 +222,7 @@ public abstract class AbstractSolrConnector implements SolrConnector { @Override public Iterator iterator() { - final BlockingQueue queue = concurrentIDsByQuery(CATCHALL_QUERY, 0, Integer.MAX_VALUE, 60000); + final BlockingQueue queue = concurrentIDsByQuery(CATCHALL_QUERY, 0, Integer.MAX_VALUE, 60000, 2 * pagesize, 1); return new LookAheadIterator() { @Override protected String next0() { diff --git a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java index 5d8c80dad..7a2844aea 100644 --- a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java @@ -415,13 +415,13 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { } @Override - public BlockingQueue concurrentDocumentsByQuery(String querystring, int offset, int maxcount, long maxtime, int buffersize, String... fields) { - return this.connector.concurrentDocumentsByQuery(querystring, offset, maxcount, maxtime, buffersize, fields); + public BlockingQueue concurrentDocumentsByQuery(String querystring, int offset, int maxcount, long maxtime, int buffersize, final int concurrency, String... fields) { + return this.connector.concurrentDocumentsByQuery(querystring, offset, maxcount, maxtime, buffersize, concurrency, fields); } @Override - public BlockingQueue concurrentIDsByQuery(String querystring, int offset, int maxcount, long maxtime) { - return this.connector.concurrentIDsByQuery(querystring, offset, maxcount, maxtime); + public BlockingQueue concurrentIDsByQuery(String querystring, int offset, int maxcount, long maxtime, int buffersize, final int concurrency) { + return this.connector.concurrentIDsByQuery(querystring, offset, maxcount, maxtime, buffersize, concurrency); } } diff --git a/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java index 8c035d5b6..69a825b9e 100644 --- a/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Map; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -427,8 +428,8 @@ public class EmbeddedSolrConnector extends SolrServerConnector implements SolrCo } @Override - public synchronized BlockingQueue concurrentIDsByQuery(final String querystring, final int offset, final int maxcount, final long maxtime) { - final BlockingQueue queue = new LinkedBlockingQueue(); + public synchronized BlockingQueue concurrentIDsByQuery(final String querystring, final int offset, final int maxcount, final long maxtime, final int buffersize, final int concurrency) { + final BlockingQueue queue = buffersize <= 0 ? new LinkedBlockingQueue() : new ArrayBlockingQueue(buffersize); final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! final Thread t = new Thread() { @Override diff --git a/source/net/yacy/cora/federate/solr/connector/MirrorSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/MirrorSolrConnector.java index fa38beae7..045f0bee5 100644 --- a/source/net/yacy/cora/federate/solr/connector/MirrorSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/MirrorSolrConnector.java @@ -427,10 +427,10 @@ public class MirrorSolrConnector extends AbstractSolrConnector implements SolrCo } @Override - public BlockingQueue concurrentIDsByQuery(String querystring, int offset, int maxcount, long maxtime) { - if (this.solr0 != null && this.solr1 == null) return this.solr0.concurrentIDsByQuery(querystring, offset, maxcount, maxtime); - if (this.solr0 == null && this.solr1 != null) return this.solr1.concurrentIDsByQuery(querystring, offset, maxcount, maxtime); - return super.concurrentIDsByQuery(querystring, offset, maxcount, maxtime); + public BlockingQueue concurrentIDsByQuery(final String querystring, final int offset, final int maxcount, final long maxtime, final int buffersize, final int concurrency) { + if (this.solr0 != null && this.solr1 == null) return this.solr0.concurrentIDsByQuery(querystring, offset, maxcount, maxtime, buffersize, concurrency); + if (this.solr0 == null && this.solr1 != null) return this.solr1.concurrentIDsByQuery(querystring, offset, maxcount, maxtime, buffersize, concurrency); + return super.concurrentIDsByQuery(querystring, offset, maxcount, maxtime, buffersize, concurrency); } } diff --git a/source/net/yacy/cora/federate/solr/connector/SolrConnector.java b/source/net/yacy/cora/federate/solr/connector/SolrConnector.java index 125fd32ac..dcc9d1d06 100644 --- a/source/net/yacy/cora/federate/solr/connector/SolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/SolrConnector.java @@ -214,10 +214,18 @@ public interface SolrConnector extends Iterable /* Iterable of document * @param maxcount the maximum number of results * @param maxtime the maximum time in milliseconds * @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used + * @param concurrency is the number of AbstractSolrConnector.POISON_DOCUMENT entries to add at the end of the feed * @param fields list of fields * @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element */ - public BlockingQueue concurrentDocumentsByQuery(final String querystring, final int offset, final int maxcount, final long maxtime, final int buffersize, final String ... fields); + public BlockingQueue concurrentDocumentsByQuery( + final String querystring, + final int offset, + final int maxcount, + final long maxtime, + final int buffersize, + final int concurrency, + final String ... fields); /** * get a document id result stream from a solr query. @@ -226,8 +234,16 @@ public interface SolrConnector extends Iterable /* Iterable of document * @param querystring * @param offset * @param maxcount + * @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used + * @param concurrency is the number of AbstractSolrConnector.POISON_ID entries to add at the end of the feed * @return */ - public BlockingQueue concurrentIDsByQuery(final String querystring, final int offset, final int maxcount, final long maxtime); + public BlockingQueue concurrentIDsByQuery( + final String querystring, + final int offset, + final int maxcount, + final long maxtime, + final int buffersize, + final int concurrency); } diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index e17f7dbd7..b5589d18a 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -2292,7 +2292,6 @@ public final class Switchboard extends serverSwitch { // execute the (post-) processing steps for all entries that have a process tag assigned Fulltext fulltext = index.fulltext(); CollectionConfiguration collection1Configuration = fulltext.getDefaultConfiguration(); - WebgraphConfiguration webgraphConfiguration = fulltext.getWebgraphConfiguration(); if (!this.crawlJobIsPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL) && MemoryControl.available() > 512L * 1024L * 1024L && Memory.load() < 2.5f) { // we optimize first because that is useful for postprocessing @@ -2302,10 +2301,9 @@ public final class Switchboard extends serverSwitch { Set deletionCandidates = collection1Configuration.contains(CollectionSchema.harvestkey_s.getSolrFieldName()) ? this.crawler.getFinishesProfiles(this.crawlQueues) : new HashSet(); int cleanupByHarvestkey = deletionCandidates.size(); - boolean processCollection = collection1Configuration.contains(CollectionSchema.process_sxt) && (index.connectedCitation() || fulltext.useWebgraph()); - boolean processWebgraph = webgraphConfiguration.contains(WebgraphSchema.process_sxt) && fulltext.useWebgraph(); + boolean postprocessing = collection1Configuration.contains(CollectionSchema.process_sxt) && (index.connectedCitation() || fulltext.useWebgraph()); boolean allCrawlsFinished = this.crawler.allCrawlsFinished(this.crawlQueues); - if ((processCollection || processWebgraph) && (cleanupByHarvestkey > 0 || allCrawlsFinished)) { + if (postprocessing && (cleanupByHarvestkey > 0 || allCrawlsFinished)) { if (cleanupByHarvestkey > 0) { // run postprocessing on these profiles postprocessingRunning = true; @@ -2315,13 +2313,6 @@ public final class Switchboard extends serverSwitch { postprocessingStartTime[0] = 0; try {postprocessingCount[0] = (int) fulltext.getDefaultConnector().getCountByQuery(CollectionSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM);} catch (IOException e) {} // should be zero but you never know - if (processWebgraph) { - postprocessingStartTime[1] = System.currentTimeMillis(); - try {postprocessingCount[1] = (int) fulltext.getWebgraphConnector().getCountByQuery(WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM);} catch (IOException e) {} - for (String profileHash: deletionCandidates) proccount += webgraphConfiguration.postprocessing(index, clickdepthCache, profileHash); - postprocessingStartTime[1] = 0; - try {postprocessingCount[1] = (int) fulltext.getWebgraphConnector().getCountByQuery(WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM);} catch (IOException e) {} - } this.crawler.cleanProfiles(deletionCandidates); log.info("cleanup removed " + cleanupByHarvestkey + " crawl profiles, post-processed " + proccount + " documents"); } else if (allCrawlsFinished) { @@ -2333,13 +2324,6 @@ public final class Switchboard extends serverSwitch { postprocessingStartTime[0] = 0; try {postprocessingCount[0] = (int) fulltext.getDefaultConnector().getCountByQuery(CollectionSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM);} catch (IOException e) {} // should be zero but you never know - if (processWebgraph) { - postprocessingStartTime[1] = System.currentTimeMillis(); - try {postprocessingCount[1] = (int) fulltext.getWebgraphConnector().getCountByQuery(WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM);} catch (IOException e) {} - proccount += webgraphConfiguration.postprocessing(index, clickdepthCache, null); - postprocessingStartTime[1] = 0; - try {postprocessingCount[1] = (int) fulltext.getWebgraphConnector().getCountByQuery(WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM);} catch (IOException e) {} - } this.crawler.cleanProfiles(this.crawler.getActiveProfiles()); log.info("cleanup post-processed " + proccount + " documents"); } diff --git a/source/net/yacy/search/index/Fulltext.java b/source/net/yacy/search/index/Fulltext.java index ed06dcd59..6c227190b 100644 --- a/source/net/yacy/search/index/Fulltext.java +++ b/source/net/yacy/search/index/Fulltext.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -426,16 +427,18 @@ public final class Fulltext { final String collectionQuery = CollectionSchema.host_s.getSolrFieldName() + ":\"" + host + "\"" + ((freshdate != null && freshdate.before(new Date())) ? (" AND " + CollectionSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") : ""); final AtomicInteger count = new AtomicInteger(0); - final BlockingQueue docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(collectionQuery, 0, 1000000, 600000, -1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); + final BlockingQueue docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(collectionQuery, 0, 1000000, 600000, 100, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); try { + Set deleteIDs = new HashSet(); SolrDocument doc; while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { String u = (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName()); if (u.startsWith(basepath)) { - remove(ASCII.getBytes((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))); + deleteIDs.add((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName())); count.incrementAndGet(); } } + remove(deleteIDs); if (count.get() > 0) Fulltext.this.commit(true); } catch (final InterruptedException e) {} return count.get(); @@ -660,7 +663,7 @@ public final class Fulltext { this.count++; } } else { - BlockingQueue docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(this.query + " AND " + CollectionSchema.httpstatus_i.getSolrFieldName() + ":200", 0, 100000000, 10 * 60 * 60 * 1000, 100, + BlockingQueue docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(this.query + " AND " + CollectionSchema.httpstatus_i.getSolrFieldName() + ":200", 0, 100000000, 10 * 60 * 60 * 1000, 100, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.title.getSolrFieldName(), CollectionSchema.author.getSolrFieldName(), CollectionSchema.description_txt.getSolrFieldName(), CollectionSchema.size_i.getSolrFieldName(), CollectionSchema.last_modified.getSolrFieldName()); SolrDocument doc; diff --git a/source/net/yacy/search/index/Segment.java b/source/net/yacy/search/index/Segment.java index d1dfe3ad7..215c83dd0 100644 --- a/source/net/yacy/search/index/Segment.java +++ b/source/net/yacy/search/index/Segment.java @@ -30,13 +30,13 @@ import java.io.File; import java.io.IOException; import java.net.MalformedURLException; import java.util.Date; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; import org.apache.solr.common.SolrDocument; @@ -287,7 +287,7 @@ public class Segment { public class ReferenceReportCache { private final Map cache; public ReferenceReportCache() { - this.cache = new HashMap(); + this.cache = new ConcurrentHashMap(); } public ReferenceReport getReferenceReport(final String id, final boolean acceptSelfReference) throws IOException { ReferenceReport rr = cache.get(id); @@ -309,11 +309,11 @@ public class Segment { } public class ClickdepthCache { - ReferenceReportCache rrc; - Map cache; + final ReferenceReportCache rrc; + final Map cache; public ClickdepthCache(ReferenceReportCache rrc) { this.rrc = rrc; - this.cache = new HashMap(); + this.cache = new ConcurrentHashMap(); } public int getClickdepth(final DigestURL url, int maxtime) throws IOException { Integer clickdepth = cache.get(ASCII.String(url.hash())); @@ -371,7 +371,7 @@ public class Segment { if ((internalIDs.size() == 0 || !connectedCitation()) && Segment.this.fulltext.useWebgraph()) { // reqd the references from the webgraph SolrConnector webgraph = Segment.this.fulltext.getWebgraphConnector(); - BlockingQueue docs = webgraph.concurrentDocumentsByQuery("{!raw f=" + WebgraphSchema.target_id_s.getSolrFieldName() + "}" + ASCII.String(id), 0, 10000000, 1000, 100, WebgraphSchema.source_id_s.getSolrFieldName()); + BlockingQueue docs = webgraph.concurrentDocumentsByQuery("{!raw f=" + WebgraphSchema.target_id_s.getSolrFieldName() + "}" + ASCII.String(id), 0, 10000000, 1000, 100, 1, WebgraphSchema.source_id_s.getSolrFieldName()); SolrDocument doc; try { while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { @@ -474,12 +474,12 @@ public class Segment { final BlockingQueue docQueue; final String urlstub; if (stub == null) { - docQueue = this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(AbstractSolrConnector.CATCHALL_QUERY, 0, Integer.MAX_VALUE, maxtime, maxcount, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); + docQueue = this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(AbstractSolrConnector.CATCHALL_QUERY, 0, Integer.MAX_VALUE, maxtime, maxcount, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); urlstub = null; } else { final String host = stub.getHost(); String hh = DigestURL.hosthash(host); - docQueue = this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(CollectionSchema.host_id_s + ":\"" + hh + "\"", 0, Integer.MAX_VALUE, maxtime, maxcount, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); + docQueue = this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(CollectionSchema.host_id_s + ":\"" + hh + "\"", 0, Integer.MAX_VALUE, maxtime, maxcount, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); urlstub = stub.toNormalform(true); } diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index 733df2487..9e45ef3b9 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -42,6 +42,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import net.yacy.cora.document.analysis.EnhancedTextProfileSignature; @@ -913,14 +915,14 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri * @param urlCitation * @return */ - public int postprocessing(final Segment segment, ReferenceReportCache rrCache, ClickdepthCache clickdepthCache, String harvestkey) { + public int postprocessing(final Segment segment, final ReferenceReportCache rrCache, final ClickdepthCache clickdepthCache, final String harvestkey) { if (!this.contains(CollectionSchema.process_sxt)) return 0; if (!segment.connectedCitation() && !segment.fulltext().useWebgraph()) return 0; - SolrConnector collectionConnector = segment.fulltext().getDefaultConnector(); + final SolrConnector collectionConnector = segment.fulltext().getDefaultConnector(); collectionConnector.commit(false); // make sure that we have latest information that can be found if (segment.fulltext().useWebgraph()) segment.fulltext().getWebgraphConnector().commit(false); - CollectionConfiguration collection = segment.fulltext().getDefaultConfiguration(); - WebgraphConfiguration webgraph = segment.fulltext().getWebgraphConfiguration(); + final CollectionConfiguration collection = segment.fulltext().getDefaultConfiguration(); + final WebgraphConfiguration webgraph = segment.fulltext().getWebgraphConfiguration(); // collect hosts from index which shall take part in citation computation @@ -936,15 +938,15 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri } // create the ranking map - Map rankings = null; + final Map rankings = new ConcurrentHashMap(); if ((segment.fulltext().useWebgraph() && ((webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) || (webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i))) || (collection.contains(CollectionSchema.cr_host_count_i) && collection.contains(CollectionSchema.cr_host_chance_d) && collection.contains(CollectionSchema.cr_host_norm_i)))) try { - ConcurrentLog.info("CollectionConfiguration", "collecting " + hostscore.size() + " hosts"); - rankings = new HashMap(); + int concurrency = Math.min(hostscore.size(), Runtime.getRuntime().availableProcessors()); + ConcurrentLog.info("CollectionConfiguration", "collecting " + hostscore.size() + " hosts, concrrency = " + concurrency); int countcheck = 0; for (String host: hostscore.keyList(true)) { // Patch the citation index for links with canonical tags. @@ -953,7 +955,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri // To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; long patchquerycount = collectionConnector.getCountByQuery(patchquery); - BlockingQueue documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, 0, 10000000, 600000, 100, + BlockingQueue documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, 0, 10000000, 600000, 200, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName()); SolrDocument doc_B; int patchquerycountcheck = 0; @@ -1020,63 +1022,111 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri } // process all documents at the webgraph for the outgoing links of this document - SolrDocument doc; - int allcount = 0; + final AtomicInteger allcount = new AtomicInteger(0); if (segment.fulltext().useWebgraph()) { - Set omitFields = new HashSet(); + final Set omitFields = new HashSet(); omitFields.add(WebgraphSchema.process_sxt.getSolrFieldName()); omitFields.add(WebgraphSchema.harvestkey_s.getSolrFieldName()); try { - int proccount = 0; - long start = System.currentTimeMillis(); + final long start = System.currentTimeMillis(); for (String host: hostscore.keyList(true)) { if (hostscore.get(host) <= 0) continue; + final String hostfinal = host; // select all webgraph edges and modify their cr value query = WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + host + "\" AND " + WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; - long count = segment.fulltext().getWebgraphConnector().getCountByQuery(query); - ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph"); - BlockingQueue docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(query, 0, 10000000, 1800000, 100); - int countcheck = 0; - while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { - SolrInputDocument sid = webgraph.toSolrInputDocument(doc, omitFields); - if (webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) { - String id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName()); - CRV crv = rankings.get(id); - if (crv != null) { - sid.setField(WebgraphSchema.source_cr_host_norm_i.getSolrFieldName(), crv.crn); + final long count = segment.fulltext().getWebgraphConnector().getCountByQuery(query); + int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4)); + ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency); + final BlockingQueue docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(query, 0, 10000000, 1800000, 200, concurrency); + final AtomicInteger proccount = new AtomicInteger(0); + Thread[] t = new Thread[concurrency]; + for (final AtomicInteger i = new AtomicInteger(0); i.get() < t.length; i.incrementAndGet()) { + t[i.get()] = new Thread() { + private String name = "CollectionConfiguration.postprocessing.webgraph-" + i.get(); + public void run() { + Thread.currentThread().setName(name); + SolrDocument doc; String protocol, urlstub, id; DigestURL url; + try { + while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { + SolrInputDocument sid = webgraph.toSolrInputDocument(doc, omitFields); + Collection proctags = doc.getFieldValues(WebgraphSchema.process_sxt.getSolrFieldName()); + Set process = new HashSet(); + for (Object tag: proctags) { + ProcessType tagtype = ProcessType.valueOf((String) tag); + process.add(tagtype); + } + + // set cr values + if (webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) { + id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName()); + CRV crv = rankings.get(id); + if (crv != null) { + sid.setField(WebgraphSchema.source_cr_host_norm_i.getSolrFieldName(), crv.crn); + } + } + if (webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i)) { + id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName()); + CRV crv = rankings.get(id); + if (crv != null) { + sid.setField(WebgraphSchema.target_cr_host_norm_i.getSolrFieldName(), crv.crn); + } + } + + // set clickdepth + if (process.contains(ProcessType.CLICKDEPTH)) { + if (webgraph.contains(WebgraphSchema.source_clickdepth_i) && webgraph.contains(WebgraphSchema.source_protocol_s) && webgraph.contains(WebgraphSchema.source_urlstub_s) && webgraph.contains(WebgraphSchema.source_id_s)) { + protocol = (String) doc.getFieldValue(WebgraphSchema.source_protocol_s.getSolrFieldName()); + urlstub = (String) doc.getFieldValue(WebgraphSchema.source_urlstub_s.getSolrFieldName()); + id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName()); + try { + url = new DigestURL(protocol + "://" + urlstub, ASCII.getBytes(id)); + postprocessing_clickdepth(clickdepthCache, sid, url, WebgraphSchema.source_clickdepth_i, 100); + } catch (MalformedURLException e) { + } + } + if (webgraph.contains(WebgraphSchema.target_clickdepth_i) && webgraph.contains(WebgraphSchema.target_protocol_s) && webgraph.contains(WebgraphSchema.target_urlstub_s) && webgraph.contains(WebgraphSchema.target_id_s)) { + protocol = (String) doc.getFieldValue(WebgraphSchema.target_protocol_s.getSolrFieldName()); + urlstub = (String) doc.getFieldValue(WebgraphSchema.target_urlstub_s.getSolrFieldName()); + id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName()); + try { + url = new DigestURL(protocol + "://" + urlstub, ASCII.getBytes(id)); + postprocessing_clickdepth(clickdepthCache, sid, url, WebgraphSchema.target_clickdepth_i, 100); + } catch (MalformedURLException e) { + } + } + } + + // write document back to index + try { + sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName()); + sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName()); + segment.fulltext().getWebgraphConnector().deleteById((String) sid.getFieldValue(WebgraphSchema.id.getSolrFieldName())); + segment.fulltext().getWebgraphConnector().add(sid); + } catch (SolrException e) { + ConcurrentLog.logException(e); + } catch (IOException e) { + ConcurrentLog.logException(e); + } + proccount.incrementAndGet(); + allcount.incrementAndGet(); + if (proccount.get() % 1000 == 0) ConcurrentLog.info( + "CollectionConfiguration", "webgraph - postprocessed " + proccount + " from " + count + " documents; " + + (proccount.get() * 1000 / (System.currentTimeMillis() - start)) + " docs/second; " + + ((System.currentTimeMillis() - start) * (count - proccount.get()) / proccount.get() / 60000) + " minutes remaining for host " + hostfinal); + } + } catch (InterruptedException e) { + ConcurrentLog.warn("CollectionConfiguration", e.getMessage(), e); + } } - } - if (webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i)) { - String id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName()); - CRV crv = rankings.get(id); - if (crv != null) { - sid.setField(WebgraphSchema.target_cr_host_norm_i.getSolrFieldName(), crv.crn); - } - } - try { - sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName()); - sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName()); - segment.fulltext().getWebgraphConnector().deleteById((String) sid.getFieldValue(WebgraphSchema.id.getSolrFieldName())); - segment.fulltext().getWebgraphConnector().add(sid); - } catch (SolrException e) { - ConcurrentLog.logException(e); - } catch (IOException e) { - ConcurrentLog.logException(e); - } - countcheck++; - proccount++; allcount++; - if (proccount % 1000 == 0) ConcurrentLog.info( - "CollectionConfiguration", "webgraph - postprocessed " + proccount + " from " + count + " documents; " + - (proccount * 1000 / (System.currentTimeMillis() - start)) + " docs/second; " + - ((System.currentTimeMillis() - start) * (count - proccount) / proccount / 60000) + " minutes remaining"); + }; + t[i.get()].start(); } + for (int i = 0; i < t.length; i++) try {t[i].join();} catch (InterruptedException e) {} - if (count != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous webgraph document count for host " + host + ": expected=" + count + ", counted=" + countcheck); + if (count != proccount.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous webgraph document count for host " + host + ": expected=" + count + ", counted=" + proccount); } } catch (final IOException e2) { ConcurrentLog.warn("CollectionConfiguration", e2.getMessage(), e2); - } catch (final InterruptedException e3) { - ConcurrentLog.warn("CollectionConfiguration", e3.getMessage(), e3); } } @@ -1093,9 +1143,10 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri long count = collectionConnector.getCountByQuery(query); long start = System.currentTimeMillis(); ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the collection for harvestkey " + harvestkey); - BlockingQueue docs = collectionConnector.concurrentDocumentsByQuery(query, 0, 10000000, 1800000, 100); + BlockingQueue docs = collectionConnector.concurrentDocumentsByQuery(query, 0, 10000000, 1800000, 200, 1); int countcheck = 0; Collection failids = new ArrayList(); + SolrDocument doc; while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { // for each to-be-processed entry work on the process tag Collection proctags = doc.getFieldValues(CollectionSchema.process_sxt.getSolrFieldName()); @@ -1118,7 +1169,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri collection.contains(CollectionSchema.cr_host_count_i) && collection.contains(CollectionSchema.cr_host_chance_d) && collection.contains(CollectionSchema.cr_host_norm_i)) { - CRV crv = rankings.get(ASCII.String(id)); + CRV crv = rankings.remove(ASCII.String(id)); // instead of 'get'ting the CRV, we also remove it because we will not need it again and free some memory here if (crv != null) { sid.setField(CollectionSchema.cr_host_count_i.getSolrFieldName(), crv.count); sid.setField(CollectionSchema.cr_host_chance_d.getSolrFieldName(), crv.cr); @@ -1151,7 +1202,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri collectionConnector.deleteById(i); collectionConnector.add(sid); - proccount++; allcount++; + proccount++; allcount.incrementAndGet(); if (proccount % 100 == 0) ConcurrentLog.info( "CollectionConfiguration", "collection - postprocessed " + proccount + " from " + count + " documents; " + (proccount * 1000 / (System.currentTimeMillis() - start)) + " docs/second; " + @@ -1177,7 +1228,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri } catch (IOException e3) { ConcurrentLog.warn("CollectionConfiguration", e3.getMessage(), e3); } - return allcount; + return allcount.get(); } private static final class CRV { @@ -1211,10 +1262,10 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri this.rrCache = rrCache; this.converge_eq_factor = (int) Math.pow(10.0d, converge_digits); SolrConnector connector = segment.fulltext().getDefaultConnector(); - this.crt = new HashMap(); + this.crt = new ConcurrentHashMap(); try { // select all documents for each host - BlockingQueue ids = connector.concurrentIDsByQuery("{!raw f=" + CollectionSchema.host_s.getSolrFieldName() + "}" + host, 0, 10000000, 600000); + BlockingQueue ids = connector.concurrentIDsByQuery("{!raw f=" + CollectionSchema.host_s.getSolrFieldName() + "}" + host, 0, 10000000, 600000, 200, 1); String id; while ((id = ids.take()) != AbstractSolrConnector.POISON_ID) { this.crt.put(id, new double[]{0.0d,0.0d}); //{old value, new value} diff --git a/source/net/yacy/search/schema/WebgraphConfiguration.java b/source/net/yacy/search/schema/WebgraphConfiguration.java index 9ab160cba..66dddd21f 100644 --- a/source/net/yacy/search/schema/WebgraphConfiguration.java +++ b/source/net/yacy/search/schema/WebgraphConfiguration.java @@ -35,7 +35,6 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.BlockingQueue; import java.util.regex.Pattern; import org.apache.solr.common.SolrDocument; @@ -48,15 +47,11 @@ import net.yacy.cora.document.id.MultiProtocolURL; import net.yacy.cora.federate.solr.ProcessType; import net.yacy.cora.federate.solr.SchemaConfiguration; import net.yacy.cora.federate.solr.SchemaDeclaration; -import net.yacy.cora.federate.solr.connector.AbstractSolrConnector; -import net.yacy.cora.federate.solr.connector.SolrConnector; import net.yacy.cora.protocol.Domains; import net.yacy.cora.protocol.ResponseHeader; import net.yacy.cora.util.CommonPattern; import net.yacy.cora.util.ConcurrentLog; import net.yacy.document.parser.html.ImageEntry; -import net.yacy.search.index.Segment; -import net.yacy.search.index.Segment.ClickdepthCache; public class WebgraphConfiguration extends SchemaConfiguration implements Serializable { @@ -306,74 +301,6 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial // return the edge return edge; } - - - public int postprocessing(final Segment segment, ClickdepthCache clickdepthCache, final String harvestkey) { - if (!this.contains(WebgraphSchema.process_sxt)) return 0; - if (!segment.fulltext().useWebgraph()) return 0; - SolrConnector webgraphConnector = segment.fulltext().getWebgraphConnector(); - // that means we must search for those entries. - webgraphConnector.commit(true); // make sure that we have latest information that can be found - //BlockingQueue docs = index.fulltext().getSolr().concurrentQuery("*:*", 0, 1000, 60000, 10); - String query = (harvestkey == null || !this.contains(WebgraphSchema.harvestkey_s) ? "" : WebgraphSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") + WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; - BlockingQueue docs = webgraphConnector.concurrentDocumentsByQuery(query, 0, 10000000, 1800000, 100); - - SolrDocument doc; - String protocol, urlstub, id; - DigestURL url; - int proccount = 0, proccount_clickdepthchange = 0; - try { - while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { - // for each to-be-processed entry work on the process tag - Collection proctags = doc.getFieldValues(WebgraphSchema.process_sxt.getSolrFieldName()); - - try { - SolrInputDocument sid = this.toSolrInputDocument(doc); - //boolean changed = false; - for (Object tag: proctags) { - - // switch over tag types - ProcessType tagtype = ProcessType.valueOf((String) tag); - if (tagtype == ProcessType.CLICKDEPTH) { - if (this.contains(WebgraphSchema.source_clickdepth_i) && this.contains(WebgraphSchema.source_protocol_s) && this.contains(WebgraphSchema.source_urlstub_s) && this.contains(WebgraphSchema.source_id_s)) { - protocol = (String) doc.getFieldValue(WebgraphSchema.source_protocol_s.getSolrFieldName()); - urlstub = (String) doc.getFieldValue(WebgraphSchema.source_urlstub_s.getSolrFieldName()); - id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName()); - url = new DigestURL(protocol + "://" + urlstub, ASCII.getBytes(id)); - if (postprocessing_clickdepth(clickdepthCache, sid, url, WebgraphSchema.source_clickdepth_i, 100)) { - proccount_clickdepthchange++; - //changed = true; - } - //ConcurrentLog.info("WebgraphConfiguration", "postprocessing webgraph source id " + id + ", url=" + protocol + "://" + urlstub + ", result: " + (changed ? "changed" : "not changed")); - } - if (this.contains(WebgraphSchema.target_clickdepth_i) && this.contains(WebgraphSchema.target_protocol_s) && this.contains(WebgraphSchema.target_urlstub_s) && this.contains(WebgraphSchema.target_id_s)) { - protocol = (String) doc.getFieldValue(WebgraphSchema.target_protocol_s.getSolrFieldName()); - urlstub = (String) doc.getFieldValue(WebgraphSchema.target_urlstub_s.getSolrFieldName()); - id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName()); - url = new DigestURL(protocol + "://" + urlstub, ASCII.getBytes(id)); - if (postprocessing_clickdepth(clickdepthCache, sid, url, WebgraphSchema.target_clickdepth_i, 100)) { - proccount_clickdepthchange++; - //changed = true; - } - //ConcurrentLog.info("WebgraphConfiguration", "postprocessing webgraph target id " + id + ", url=" + protocol + "://" + urlstub + ", result: " + (changed ? "changed" : "not changed")); - } - } - } - // all processing steps checked, remove the processing tag - sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName()); - if (this.contains(WebgraphSchema.harvestkey_s)) sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName()); - // send back to index - webgraphConnector.add(sid); - proccount++; - } catch (Throwable e1) { - ConcurrentLog.warn(WebgraphConfiguration.class.getName(), "postprocessing failed", e1); - } - } - ConcurrentLog.info("WebgraphConfiguration", "cleanup_processing: re-calculated " + proccount + " new documents, " + proccount_clickdepthchange + " clickdepth values changed."); - } catch (final InterruptedException e) { - } - return proccount; - } /** * encode a string containing attributes from anchor rel properties binary: