From 8dbc80da70d8aadca8830f969b1c36462aaa8ac9 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Fri, 17 May 2013 13:59:37 +0200 Subject: [PATCH] redesign of index.exist-test: this shall now not be done using a single id to be tested, but with a collection of ids. This will cause only a single call to solr instead of many. The result is a much better performace when testing the existence of many urls. The effect should cause very much less IO during index transmission, both on sender and receiver side. --- htroot/HostBrowser.java | 1 + htroot/IndexControlRWIs_p.java | 1 + htroot/Load_RSS_p.java | 36 ++- htroot/yacy/transferRWI.java | 28 ++- htroot/yacy/transferURL.java | 22 +- .../solr/connector/AbstractSolrConnector.java | 45 ++++ .../ConcurrentUpdateSolrConnector.java | 21 ++ .../solr/connector/SolrConnector.java | 15 +- .../net/yacy/cora/protocol/ftp/FTPClient.java | 3 + .../yacy/crawler/retrieval/HTTPLoader.java | 29 +-- .../net/yacy/crawler/retrieval/RSSLoader.java | 30 ++- .../crawler/retrieval/SitemapImporter.java | 1 + .../kelondro/workflow/WorkflowProcessor.java | 2 +- source/net/yacy/migration.java | 11 - source/net/yacy/peers/Transmission.java | 18 +- source/net/yacy/search/Switchboard.java | 223 +++++++++--------- source/net/yacy/search/index/Fulltext.java | 26 ++ source/net/yacy/search/index/Segment.java | 6 + source/net/yacy/search/query/SearchEvent.java | 4 +- source/net/yacy/server/serverObjects.java | 6 +- 20 files changed, 347 insertions(+), 181 deletions(-) diff --git a/htroot/HostBrowser.java b/htroot/HostBrowser.java index e1d2f169d..64d508970 100644 --- a/htroot/HostBrowser.java +++ b/htroot/HostBrowser.java @@ -62,6 +62,7 @@ public class HostBrowser { LINK, INDEX, EXCLUDED, FAILED; } + @SuppressWarnings("deprecation") public static serverObjects respond(final RequestHeader header, final serverObjects post, final serverSwitch env) { // return variable that accumulates replacements final Switchboard sb = (Switchboard) env; diff --git a/htroot/IndexControlRWIs_p.java b/htroot/IndexControlRWIs_p.java index c1035007a..934c0e5d6 100644 --- a/htroot/IndexControlRWIs_p.java +++ b/htroot/IndexControlRWIs_p.java @@ -76,6 +76,7 @@ public class IndexControlRWIs_p { private final static String errmsg = "not possible to compute word from hash"; + @SuppressWarnings("deprecation") public static serverObjects respond(@SuppressWarnings("unused") final RequestHeader header, final serverObjects post, final serverSwitch env) { // return variable that accumulates replacements final Switchboard sb = (Switchboard) env; diff --git a/htroot/Load_RSS_p.java b/htroot/Load_RSS_p.java index ff500c60e..99bce927f 100644 --- a/htroot/Load_RSS_p.java +++ b/htroot/Load_RSS_p.java @@ -23,6 +23,7 @@ import java.net.MalformedURLException; import java.text.DateFormat; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -38,11 +39,11 @@ import net.yacy.cora.federate.yacy.CacheStrategy; import net.yacy.cora.protocol.RequestHeader; import net.yacy.cora.util.CommonPattern; import net.yacy.cora.util.SpaceExceededException; +import net.yacy.crawler.HarvestProcess; import net.yacy.crawler.data.CrawlQueues; import net.yacy.crawler.retrieval.RSSLoader; import net.yacy.crawler.retrieval.Response; import net.yacy.data.WorkTables; -import net.yacy.document.Parser.Failure; import net.yacy.kelondro.blob.Tables; import net.yacy.kelondro.blob.Tables.Row; import net.yacy.kelondro.data.meta.DigestURI; @@ -275,20 +276,31 @@ public class Load_RSS_p { // index all selected items: description only if (rss != null && post.containsKey("indexSelectedItemContent")) { final RSSFeed feed = rss.getFeed(); + List list = new ArrayList(); + Map messages = new HashMap(); loop: for (final Map.Entry entry: post.entrySet()) { if (entry.getValue().startsWith("mark_")) try { final RSSMessage message = feed.getMessage(entry.getValue().substring(5)); final DigestURI messageurl = new DigestURI(message.getLink()); if (RSSLoader.indexTriggered.containsKey(messageurl.hash())) continue loop; - if (sb.urlExists(ASCII.String(messageurl.hash())) != null) continue loop; - sb.addToIndex(messageurl, null, null, collections); - RSSLoader.indexTriggered.insertIfAbsent(messageurl.hash(), new Date()); + messages.put(ASCII.String(messageurl.hash()), message); } catch (final IOException e) { Log.logException(e); - } catch (final Failure e) { + } + } + Map existingurls = sb.urlExists(messages.keySet()); + loop: for (final Map.Entry entry: messages.entrySet()) { + try { + final RSSMessage message = entry.getValue(); + final DigestURI messageurl = new DigestURI(message.getLink()); + if (existingurls.get(ASCII.String(messageurl.hash())) != null) continue loop; + list.add(messageurl); + RSSLoader.indexTriggered.insertIfAbsent(messageurl.hash(), new Date()); + } catch (final IOException e) { Log.logException(e); } } + sb.addToIndex(list, null, null, collections); } if (rss != null && post.containsKey("indexAllItemContent")) { @@ -318,6 +330,18 @@ public class Load_RSS_p { prop.putHTML("showitems_ttl", channel == null ? "" : channel.getTTL()); prop.putHTML("showitems_docs", channel == null ? "" : channel.getDocs()); + Map urls = new HashMap(); + for (final Hit item: feed) { + try { + final DigestURI messageurl = new DigestURI(item.getLink()); + urls.put(ASCII.String(messageurl.hash()), messageurl); + } catch (final MalformedURLException e) { + Log.logException(e); + continue; + } + } + Map ids = sb.urlExists(urls.keySet()); + int i = 0; for (final Hit item: feed) { try { @@ -325,7 +349,7 @@ public class Load_RSS_p { author = item.getAuthor(); if (author == null) author = item.getCopyright(); pubDate = item.getPubDate(); - prop.put("showitems_item_" + i + "_state", sb.urlExists(ASCII.String(messageurl.hash())) != null ? 2 : RSSLoader.indexTriggered.containsKey(messageurl.hash()) ? 1 : 0); + prop.put("showitems_item_" + i + "_state", ids.get(ASCII.String(messageurl.hash())) != null ? 2 : RSSLoader.indexTriggered.containsKey(messageurl.hash()) ? 1 : 0); prop.put("showitems_item_" + i + "_state_count", i); prop.putHTML("showitems_item_" + i + "_state_guid", item.getGuid()); prop.putHTML("showitems_item_" + i + "_author", author == null ? "" : author); diff --git a/htroot/yacy/transferRWI.java b/htroot/yacy/transferRWI.java index 14186eb7a..fc747ff9d 100644 --- a/htroot/yacy/transferRWI.java +++ b/htroot/yacy/transferRWI.java @@ -28,7 +28,9 @@ import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; +import java.util.Set; import net.yacy.cora.document.ASCII; import net.yacy.cora.document.RSSMessage; @@ -37,6 +39,7 @@ import net.yacy.cora.federate.yacy.Distribution; import net.yacy.cora.protocol.HeaderFramework; import net.yacy.cora.protocol.RequestHeader; import net.yacy.cora.storage.HandleSet; +import net.yacy.cora.util.SpaceExceededException; import net.yacy.kelondro.data.meta.URIMetadataRow; import net.yacy.kelondro.data.word.WordReferenceRow; import net.yacy.kelondro.index.RowHandleSet; @@ -156,8 +159,8 @@ public final class transferRWI { final ArrayList wordhashes = new ArrayList(); int received = 0; int blocked = 0; - int receivedURL = 0; int count = 0; + Set testids = new HashSet(); while (it.hasNext()) { serverCore.checkInterruption(); estring = it.next(); @@ -200,17 +203,20 @@ public final class transferRWI { serverCore.checkInterruption(); // check if we need to ask for the corresponding URL - if (!knownURL.has(urlHash) && !unknownURL.has(urlHash)) try { - if (sb.index.fulltext().exists(ASCII.String(urlHash))) { - knownURL.put(urlHash); + testids.add(ASCII.String(urlHash)); + received++; + } + Set existing = sb.index.fulltext().exists(testids); + for (String id: testids) { + try { + if (existing.contains(id)) { + knownURL.put(ASCII.getBytes(id)); } else { - unknownURL.put(urlHash); + unknownURL.put(ASCII.getBytes(id)); } - receivedURL++; - } catch (final Exception ex) { - sb.getLog().logWarning("transferRWI: DB-Error while trying to determine if URL with hash '" + ASCII.String(urlHash) + "' is known.", ex); + } catch (SpaceExceededException e) { + sb.getLog().logWarning("transferRWI: DB-Error while trying to determine if URL with hash '" + id + "' is known.", e); } - received++; } sb.peers.mySeed().incRI(received); @@ -227,8 +233,8 @@ public final class transferRWI { final String firstHash = wordhashes.get(0); final String lastHash = wordhashes.get(wordhashes.size() - 1); final long avdist = (Distribution.horizontalDHTDistance(firstHash.getBytes(), ASCII.getBytes(sb.peers.mySeed().hash)) + Distribution.horizontalDHTDistance(lastHash.getBytes(), ASCII.getBytes(sb.peers.mySeed().hash))) / 2; - sb.getLog().logInfo("Received " + received + " RWIs, " + wordc + " Words [" + firstHash + " .. " + lastHash + "], processed in " + (System.currentTimeMillis() - startProcess) + " milliseconds, " + avdist + ", blocked " + blocked + ", requesting " + unknownURL.size() + "/" + receivedURL + " URLs from " + otherPeerName); - EventChannel.channels(EventChannel.DHTRECEIVE).addMessage(new RSSMessage("Received " + received + " RWIs, " + wordc + " Words [" + firstHash + " .. " + lastHash + "], processed in " + (System.currentTimeMillis() - startProcess) + " milliseconds, " + avdist + ", blocked " + blocked + ", requesting " + unknownURL.size() + "/" + receivedURL + " URLs from " + otherPeerName, "", otherPeer.hash)); + sb.getLog().logInfo("Received " + received + " RWIs, " + wordc + " Words [" + firstHash + " .. " + lastHash + "], processed in " + (System.currentTimeMillis() - startProcess) + " milliseconds, " + avdist + ", blocked " + blocked + ", requesting " + unknownURL.size() + "/" + received+ " URLs from " + otherPeerName); + EventChannel.channels(EventChannel.DHTRECEIVE).addMessage(new RSSMessage("Received " + received + " RWIs, " + wordc + " Words [" + firstHash + " .. " + lastHash + "], processed in " + (System.currentTimeMillis() - startProcess) + " milliseconds, " + avdist + ", blocked " + blocked + ", requesting " + unknownURL.size() + "/" + received + " URLs from " + otherPeerName, "", otherPeer.hash)); } result = "ok"; diff --git a/htroot/yacy/transferURL.java b/htroot/yacy/transferURL.java index 506b6d2c9..0213824df 100644 --- a/htroot/yacy/transferURL.java +++ b/htroot/yacy/transferURL.java @@ -28,6 +28,9 @@ import java.io.IOException; import java.text.ParseException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import net.yacy.cora.date.GenericFormatter; import net.yacy.cora.document.ASCII; @@ -88,6 +91,7 @@ public final class transferURL { // read the urls from the other properties and store String urls; URIMetadataRow lEntry; + Map lEm = new HashMap(); for (int i = 0; i < urlc; i++) { serverCore.checkInterruption(); @@ -138,16 +142,16 @@ public final class transferURL { continue; } - // doublecheck - if (sb.index.exists(ASCII.String(lEntry.hash()))) { - if (Network.log.isFine()) Network.log.logFine("transferURL: double URL '" + lEntry.url() + "' from peer " + otherPeerName); - lEntry = null; - doublecheck++; - continue; - } - + lEm.put(ASCII.String(lEntry.hash()), lEntry); + } + + Set nondoubles = sb.index.exists(lEm.keySet()); + doublecheck += (lEm.size() - nondoubles.size()); + for (String id: nondoubles) { + lEntry = lEm.get(id); + // write entry to database - if (Network.log.isFine()) Network.log.logFine("Accepting URL " + i + "/" + urlc + " from peer " + otherPeerName + ": " + lEntry.url().toNormalform(true)); + if (Network.log.isFine()) Network.log.logFine("Accepting URL from peer " + otherPeerName + ": " + lEntry.url().toNormalform(true)); try { sb.index.fulltext().putMetadataLater(lEntry); ResultURLs.stack(ASCII.String(lEntry.url().hash()), lEntry.url().getHost(), iam.getBytes(), iam.getBytes(), EventOrigin.DHT_TRANSFER); diff --git a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java index b4cc0ed11..80593ea77 100644 --- a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java @@ -21,10 +21,13 @@ package net.yacy.cora.federate.solr.connector; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -199,6 +202,12 @@ public abstract class AbstractSolrConnector implements SolrConnector { return docs; } + /** + * check if a given document, identified by url hash as ducument id exists + * @param id the url hash and document id + * @return true if any entry in solr exists + * @throws IOException + */ @Override public boolean existsById(String id) throws IOException { // construct raw query @@ -220,6 +229,42 @@ public abstract class AbstractSolrConnector implements SolrConnector { return exist; } + /** + * check a set of ids for existence. + * @param ids a collection of document ids + * @return a collection of a subset of the ids which exist in the index + * @throws IOException + */ + public Set existsByIds(Collection ids) throws IOException { + if (ids == null || ids.size() == 0) return new HashSet(); + // construct raw query + final SolrQuery params = new SolrQuery(); + //params.setQuery(CollectionSchema.id.getSolrFieldName() + ":\"" + id + "\""); + StringBuilder sb = new StringBuilder(); // construct something like "({!raw f=id}Ij7B63g-gSHA) OR ({!raw f=id}PBcGI3g-gSHA)" + for (String id: ids) { + sb.append("({!raw f=").append(CollectionSchema.id.getSolrFieldName()).append('}').append(id).append(") OR "); + } + if (sb.length() > 0) sb.setLength(sb.length() - 4); // cut off the last 'or' + params.setQuery(sb.toString()); + //params.set("defType", "raw"); + params.setRows(ids.size()); // we want all lines + params.setStart(0); + params.setFacet(false); + params.clearSorts(); + params.setFields(CollectionSchema.id.getSolrFieldName()); + params.setIncludeScore(false); + + // query the server + QueryResponse rsp = getResponseByParams(params); + final SolrDocumentList docs = rsp.getResults(); + // construct a new id list from that + HashSet idsr = new HashSet(); + for (SolrDocument doc : docs) { + idsr.add((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName())); + } + return idsr; + } + /** * get the number of results when this query is done. * This should only be called if the actual result is never used, and only the count is interesting diff --git a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java index a6fa9f052..47c74254a 100644 --- a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java @@ -22,8 +22,10 @@ package net.yacy.cora.federate.solr.connector; import java.io.IOException; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -318,6 +320,25 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { return false; } + @Override + public Set existsByIds(Collection ids) throws IOException { + HashSet e = new HashSet(); + if (ids == null || ids.size() == 0) return e; + Collection idsC = new HashSet(); + for (String id: ids) { + if (this.idCache.has(ASCII.getBytes(id))) {cacheSuccessSign(); e.add(id); continue;} + if (existIdFromDeleteQueue(id)) {cacheSuccessSign(); continue;} + if (existIdFromUpdateQueue(id)) {cacheSuccessSign(); e.add(id); continue;} + idsC.add(id); + } + Set e1 = this.connector.existsByIds(idsC); + for (String id1: e1) { + updateIdCache(id1); + } + e.addAll(e1); + return e; + } + @Override public boolean existsByQuery(String solrquery) throws IOException { // this is actually wrong but to make it right we need to wait until all queues are flushed. But that may take very long when the queues are filled again all the time. diff --git a/source/net/yacy/cora/federate/solr/connector/SolrConnector.java b/source/net/yacy/cora/federate/solr/connector/SolrConnector.java index 4b71a7aac..f429f78ef 100644 --- a/source/net/yacy/cora/federate/solr/connector/SolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/SolrConnector.java @@ -23,6 +23,7 @@ package net.yacy.cora.federate.solr.connector; import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import net.yacy.cora.sorting.ReversibleScoreMap; @@ -65,7 +66,7 @@ public interface SolrConnector extends Iterable /* Iterable of document public void clear() throws IOException; /** - * delete an entry from solr + * delete an entry from solr using the url hash as document id * @param id the url hash of the entry * @throws IOException */ @@ -86,13 +87,21 @@ public interface SolrConnector extends Iterable /* Iterable of document public void deleteByQuery(final String querystring) throws IOException; /** - * check if a given id exists - * @param id + * check if a given document, identified by url hash as ducument id exists + * @param id the url hash and document id * @return true if any entry in solr exists * @throws IOException */ public boolean existsById(final String id) throws IOException; + /** + * check a set of ids for existence. + * @param ids a collection of document ids + * @return a collection of a subset of the ids which exist in the index + * @throws IOException + */ + public Set existsByIds(Collection ids) throws IOException; + /** * check if a given document exists in solr * @param solrquery diff --git a/source/net/yacy/cora/protocol/ftp/FTPClient.java b/source/net/yacy/cora/protocol/ftp/FTPClient.java index 534c63480..8a00cf9e5 100644 --- a/source/net/yacy/cora/protocol/ftp/FTPClient.java +++ b/source/net/yacy/cora/protocol/ftp/FTPClient.java @@ -1988,6 +1988,9 @@ public class FTPClient { String reply; while (true) { + if (this.clientInput == null) { + throw new IOException("Server has presumably shut down the connection."); + } reply = this.clientInput.readLine(); // sanity check diff --git a/source/net/yacy/crawler/retrieval/HTTPLoader.java b/source/net/yacy/crawler/retrieval/HTTPLoader.java index b50a78dc7..cac26506c 100644 --- a/source/net/yacy/crawler/retrieval/HTTPLoader.java +++ b/source/net/yacy/crawler/retrieval/HTTPLoader.java @@ -164,22 +164,23 @@ public final class HTTPLoader { } if (this.sb.getConfigBool(SwitchboardConstants.CRAWLER_FOLLOW_REDIRECTS, true)) { - // if we are already doing a shutdown we don't need to retry crawling - if (Thread.currentThread().isInterrupted()) { - this.sb.crawlQueues.errorURL.push(request, myHash, new Date(), 1, FailCategory.FINAL_LOAD_CONTEXT, "server shutdown", statusCode); - throw new IOException("CRAWLER Retry of URL=" + requestURLString + " aborted because of server shutdown."); - } + // if we are already doing a shutdown we don't need to retry crawling + if (Thread.currentThread().isInterrupted()) { + this.sb.crawlQueues.errorURL.push(request, myHash, new Date(), 1, FailCategory.FINAL_LOAD_CONTEXT, "server shutdown", statusCode); + throw new IOException("CRAWLER Retry of URL=" + requestURLString + " aborted because of server shutdown."); + } - // check if the url was already indexed - final HarvestProcess dbname = this.sb.urlExists(ASCII.String(redirectionUrl.hash())); - if (dbname != null) { // customer request - this.sb.crawlQueues.errorURL.push(request, myHash, new Date(), 1, FailCategory.TEMPORARY_NETWORK_FAILURE, "redirection to double content", statusCode); - throw new IOException("CRAWLER Redirection of URL=" + requestURLString + " ignored. The url appears already in db " + dbname.toString()); - } + // check if the url was already indexed + @SuppressWarnings("deprecation") + final HarvestProcess dbname = this.sb.urlExists(ASCII.String(redirectionUrl.hash())); + if (dbname != null) { // customer request + this.sb.crawlQueues.errorURL.push(request, myHash, new Date(), 1, FailCategory.TEMPORARY_NETWORK_FAILURE, "redirection to double content", statusCode); + throw new IOException("CRAWLER Redirection of URL=" + requestURLString + " ignored. The url appears already in db " + dbname.toString()); + } - // retry crawling with new url - request.redirectURL(redirectionUrl); - return load(request, retryCount - 1, maxFileSize, blacklistType); + // retry crawling with new url + request.redirectURL(redirectionUrl); + return load(request, retryCount - 1, maxFileSize, blacklistType); } // we don't want to follow redirects this.sb.crawlQueues.errorURL.push(request, myHash, new Date(), 1, FailCategory.FINAL_PROCESS_CONTEXT, "redirection not wanted", statusCode); diff --git a/source/net/yacy/crawler/retrieval/RSSLoader.java b/source/net/yacy/crawler/retrieval/RSSLoader.java index fb80849b4..3659db156 100644 --- a/source/net/yacy/crawler/retrieval/RSSLoader.java +++ b/source/net/yacy/crawler/retrieval/RSSLoader.java @@ -26,7 +26,14 @@ package net.yacy.crawler.retrieval; import java.io.IOException; import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import net.yacy.cora.document.ASCII; import net.yacy.cora.document.RSSFeed; @@ -38,9 +45,9 @@ import net.yacy.cora.order.Base64Order; import net.yacy.cora.storage.ARC; import net.yacy.cora.storage.ComparableARC; import net.yacy.cora.util.SpaceExceededException; +import net.yacy.crawler.HarvestProcess; import net.yacy.crawler.data.CrawlQueues; import net.yacy.data.WorkTables; -import net.yacy.document.Parser.Failure; import net.yacy.kelondro.blob.Tables; import net.yacy.kelondro.data.meta.DigestURI; import net.yacy.kelondro.logging.Log; @@ -89,20 +96,25 @@ public class RSSLoader extends Thread { public static void indexAllRssFeed(final Switchboard sb, final DigestURI url, final RSSFeed feed, String[] collections) { int loadCount = 0; - loop: for (final RSSMessage message: feed) { + List list = new ArrayList(); + Map urlmap = new HashMap(); + for (final RSSMessage message: feed) { try { final DigestURI messageurl = new DigestURI(message.getLink()); - if (indexTriggered.containsKey(messageurl.hash())) continue loop; - if (sb.urlExists(ASCII.String(messageurl.hash())) != null) continue loop; - sb.addToIndex(messageurl, null, null, collections); - indexTriggered.insertIfAbsent(messageurl.hash(), new Date()); - loadCount++; + if (indexTriggered.containsKey(messageurl.hash())) continue; + urlmap.put(ASCII.String(messageurl.hash()), messageurl); } catch (final IOException e) { Log.logException(e); - } catch (final Failure e) { - Log.logException(e); } } + Map existingids = sb.urlExists(urlmap.keySet()); + for (final Map.Entry e: urlmap.entrySet()) { + if (existingids.get(e.getKey()) != null) continue; + list.add(e.getValue()); + indexTriggered.insertIfAbsent(ASCII.getBytes(e.getKey()), new Date()); + loadCount++; + } + sb.addToIndex(list, null, null, collections); // update info for loading try { diff --git a/source/net/yacy/crawler/retrieval/SitemapImporter.java b/source/net/yacy/crawler/retrieval/SitemapImporter.java index 9f31560e0..e75e93c70 100644 --- a/source/net/yacy/crawler/retrieval/SitemapImporter.java +++ b/source/net/yacy/crawler/retrieval/SitemapImporter.java @@ -82,6 +82,7 @@ public class SitemapImporter extends Thread { // check if the url is known and needs to be recrawled Date lastMod = entry.lastmod(null); if (lastMod != null) { + @SuppressWarnings("deprecation") final HarvestProcess dbocc = this.sb.urlExists(ASCII.String(nexturlhash)); if (dbocc != null && dbocc == HarvestProcess.LOADED) { // the url was already loaded. we need to check the date diff --git a/source/net/yacy/kelondro/workflow/WorkflowProcessor.java b/source/net/yacy/kelondro/workflow/WorkflowProcessor.java index 1efff648a..ecb4f6d5f 100644 --- a/source/net/yacy/kelondro/workflow/WorkflowProcessor.java +++ b/source/net/yacy/kelondro/workflow/WorkflowProcessor.java @@ -234,7 +234,7 @@ public class WorkflowProcessor { // wait for shutdown try { this.executor.shutdown(); - this.executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + this.executor.awaitTermination(60, TimeUnit.SECONDS); } catch (final InterruptedException e) {} } Log.logInfo("serverProcessor", "queue " + this.processName + ": shutdown."); diff --git a/source/net/yacy/migration.java b/source/net/yacy/migration.java index ec192dc42..6871ee3bc 100644 --- a/source/net/yacy/migration.java +++ b/source/net/yacy/migration.java @@ -34,25 +34,14 @@ import net.yacy.search.Switchboard; import net.yacy.search.SwitchboardConstants; import com.google.common.io.Files; -import static java.lang.Thread.MIN_PRIORITY; -import java.util.ArrayList; import java.util.Iterator; -import java.util.concurrent.Semaphore; -import net.yacy.cora.federate.solr.connector.EmbeddedSolrConnector; import net.yacy.cora.storage.Configuration.Entry; import net.yacy.kelondro.data.meta.URIMetadataRow; import net.yacy.kelondro.index.Index; import net.yacy.kelondro.index.Row; -import net.yacy.kelondro.workflow.AbstractBusyThread; -import net.yacy.kelondro.workflow.AbstractThread; import net.yacy.kelondro.workflow.BusyThread; -import net.yacy.kelondro.workflow.InstantBusyThread; -import net.yacy.kelondro.workflow.WorkflowThread; import net.yacy.search.index.Fulltext; import net.yacy.search.schema.CollectionConfiguration; -import org.apache.solr.common.SolrDocument; -import org.apache.solr.common.SolrDocumentList; -import org.apache.solr.common.SolrInputDocument; public class migration { //SVN constants diff --git a/source/net/yacy/peers/Transmission.java b/source/net/yacy/peers/Transmission.java index 97633606c..a070612fa 100644 --- a/source/net/yacy/peers/Transmission.java +++ b/source/net/yacy/peers/Transmission.java @@ -25,9 +25,12 @@ package net.yacy.peers; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Random; +import java.util.Set; import net.yacy.cora.document.ASCII; import net.yacy.cora.storage.HandleSet; @@ -160,8 +163,9 @@ public class Transmission { } final ReferenceContainer c = (remaining >= container.size()) ? container : trimContainer(container, remaining); // iterate through the entries in the container and check if the reference is in the repository - final Iterator i = c.entries(); final List notFoundx = new ArrayList(); + Collection testids = new HashSet(); + Iterator i = c.entries(); while (i.hasNext()) { final WordReference e = i.next(); if (this.references.has(e.urlhash())) continue; @@ -169,11 +173,17 @@ public class Transmission { notFoundx.add(e.urlhash()); continue; } - if (!Transmission.this.segment.fulltext().exists(ASCII.String(e.urlhash()))) { + testids.add(ASCII.String(e.urlhash())); + } + Set existingids = Transmission.this.segment.fulltext().exists(testids); + i = c.entries(); + while (i.hasNext()) { + final WordReference e = i.next(); + if (existingids.contains(ASCII.String(e.urlhash()))) { + this.references.put(e.urlhash()); + } else { notFoundx.add(e.urlhash()); this.badReferences.put(e.urlhash()); - } else { - this.references.put(e.urlhash()); } } // now delete all references that were not found diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index 7da4c3d4e..1c6de3277 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -1570,14 +1570,31 @@ public final class Switchboard extends serverSwitch { return false; } + /** + * tests if hash occurs in any database. + * @param hash + * @return if it exists, the name of the database is returned, if it not exists, null is returned + */ + @Deprecated public HarvestProcess urlExists(final String hash) { - // tests if hash occurrs in any database - // if it exists, the name of the database is returned, - // if it not exists, null is returned if (this.index.exists(hash)) return HarvestProcess.LOADED; return this.crawlQueues.urlExists(ASCII.getBytes(hash)); } + /** + * tests if hashes occur in any database. + * @param ids a collection of url hashes + * @return a map from the hash id to: if it exists, the name of the database, otherwise null + */ + public Map urlExists(final Collection ids) { + Set e = this.index.exists(ids); + Map m = new HashMap(); + for (String id: ids) { + m.put(id, e.contains(id) ? HarvestProcess.LOADED : this.crawlQueues.urlExists(ASCII.getBytes(id))); + } + return m; + } + public void urlRemove(final Segment segment, final byte[] hash) { segment.fulltext().remove(hash); ResultURLs.remove(ASCII.String(hash)); @@ -2768,36 +2785,25 @@ public final class Switchboard extends serverSwitch { final String heuristicName, final String[] collections) { + List urls = new ArrayList(); // add the landing page to the index. should not load that again since it should be in the cache - if ( url != null ) { - try { - addToIndex(url, searchEvent, heuristicName, collections); - } catch ( final IOException e ) { - } catch ( final Parser.Failure e ) { - } - + if (url != null) { + urls.add(url); } // check if some of the links match with the query final Map matcher = searchEvent.query.separateMatches(links); // take the matcher and load them all - for ( final Map.Entry entry : matcher.entrySet() ) { - try { - addToIndex(new DigestURI(entry.getKey(), (byte[]) null), searchEvent, heuristicName, collections); - } catch ( final IOException e ) { - } catch ( final Parser.Failure e ) { - } + for (final Map.Entry entry : matcher.entrySet()) { + urls.add(new DigestURI(entry.getKey(), (byte[]) null)); } // take then the no-matcher and load them also - for ( final Map.Entry entry : links.entrySet() ) { - try { - addToIndex(new DigestURI(entry.getKey(), (byte[]) null), searchEvent, heuristicName, collections); - } catch ( final IOException e ) { - } catch ( final Parser.Failure e ) { - } + for (final Map.Entry entry : links.entrySet()) { + urls.add(new DigestURI(entry.getKey(), (byte[]) null)); } + addToIndex(urls, searchEvent, heuristicName, collections); } public void remove(final Collection deleteIDs) { @@ -2837,6 +2843,7 @@ public final class Switchboard extends serverSwitch { * @param url * @return null if this was ok. If this failed, return a string with a fail reason */ + @SuppressWarnings("deprecation") public String stackUrl(CrawlProfile profile, DigestURI url) { byte[] handle = ASCII.getBytes(profile.handle()); @@ -2946,73 +2953,72 @@ public final class Switchboard extends serverSwitch { * @throws IOException * @throws Parser.Failure */ - public void addToIndex(final DigestURI url, final SearchEvent searchEvent, final String heuristicName, final String[] collections) - throws IOException, - Parser.Failure { + public void addToIndex(final Collection urls, final SearchEvent searchEvent, final String heuristicName, final String[] collections) { + Map urlmap = new HashMap(); + for (DigestURI url: urls) urlmap.put(ASCII.String(url.hash()), url); if (searchEvent != null) { - searchEvent.addHeuristic(url.hash(), heuristicName, true); - } - if (this.index.exists(ASCII.String(url.hash()))) { - return; // don't do double-work - } - final Request request = this.loader.request(url, true, true); - final CrawlProfile profile = this.crawler.getActive(ASCII.getBytes(request.profileHandle())); - final String acceptedError = this.crawlStacker.checkAcceptance(url, profile, 0); - final String urls = url.toNormalform(true); - if ( acceptedError != null ) { - this.log.logWarning("addToIndex: cannot load " - + urls - + ": " - + acceptedError); - return; + for (String id: urlmap.keySet()) searchEvent.addHeuristic(ASCII.getBytes(id), heuristicName, true); + } + final Set existing = this.index.exists(urlmap.keySet()); + final List requests = new ArrayList(); + for (Map.Entry e: urlmap.entrySet()) { + final String urlName = e.getValue().toNormalform(true); + if (existing.contains(e.getKey())) { + this.log.logInfo("addToIndex: double " + urlName); + continue; + } + final Request request = this.loader.request(e.getValue(), true, true); + final CrawlProfile profile = this.crawler.getActive(ASCII.getBytes(request.profileHandle())); + final String acceptedError = this.crawlStacker.checkAcceptance(e.getValue(), profile, 0); + if (acceptedError != null) { + this.log.logWarning("addToIndex: cannot load " + urlName + ": " + acceptedError); + continue; + } + requests.add(request); } + new Thread() { @Override public void run() { - Thread.currentThread().setName("Switchboard.addToIndex:" + urls); - try { - final Response response = - Switchboard.this.loader.load(request, CacheStrategy.IFFRESH, BlacklistType.CRAWLER, CrawlQueues.queuedMinLoadDelay); - if ( response == null ) { - throw new IOException("response == null"); - } - if ( response.getContent() == null ) { - throw new IOException("content == null"); - } - if ( response.getResponseHeader() == null ) { - throw new IOException("header == null"); - } - final Document[] documents = response.parse(); - if ( documents != null ) { - for ( final Document document : documents ) { - if ( document.indexingDenied() ) { - throw new Parser.Failure("indexing is denied", url); + for (Request request: requests) { + DigestURI url = request.url(); + String urlName = url.toNormalform(true); + Thread.currentThread().setName("Switchboard.addToIndex:" + urlName); + try { + final Response response = Switchboard.this.loader.load(request, CacheStrategy.IFFRESH, BlacklistType.CRAWLER, CrawlQueues.queuedMinLoadDelay); + if (response == null) { + throw new IOException("response == null"); + } + if (response.getContent() == null) { + throw new IOException("content == null"); + } + if (response.getResponseHeader() == null) { + throw new IOException("header == null"); + } + final Document[] documents = response.parse(); + if (documents != null) { + for (final Document document: documents) { + if (document.indexingDenied()) { + throw new Parser.Failure("indexing is denied", url); + } + final Condenser condenser = new Condenser(document, true, true, LibraryProvider.dymLib, LibraryProvider.synonyms, true); + ResultImages.registerImages(url, document, true); + Switchboard.this.webStructure.generateCitationReference(url, document); + storeDocumentIndex( + response, + collections, + document, + condenser, + searchEvent, + "heuristic:" + heuristicName); + Switchboard.this.log.logInfo("addToIndex fill of url " + urlName + " finished"); } - final Condenser condenser = new Condenser(document, true, true, LibraryProvider.dymLib, LibraryProvider.synonyms, true); - ResultImages.registerImages(url, document, true); - Switchboard.this.webStructure.generateCitationReference(url, document); - storeDocumentIndex( - response, - collections, - document, - condenser, - searchEvent, - "heuristic:" + heuristicName); - Switchboard.this.log.logInfo("addToIndex fill of url " - + url.toNormalform(true) - + " finished"); } + } catch ( final IOException e ) { + Switchboard.this.log.logWarning("addToIndex: failed loading " + urlName + ": " + e.getMessage()); + } catch ( final Parser.Failure e ) { + Switchboard.this.log.logWarning("addToIndex: failed parsing " + urlName + ": " + e.getMessage()); } - } catch ( final IOException e ) { - Switchboard.this.log.logWarning("addToIndex: failed loading " - + url.toNormalform(true) - + ": " - + e.getMessage()); - } catch ( final Parser.Failure e ) { - Switchboard.this.log.logWarning("addToIndex: failed parsing " - + url.toNormalform(true) - + ": " - + e.getMessage()); } } }.start(); @@ -3026,33 +3032,30 @@ public final class Switchboard extends serverSwitch { * @param url the url that shall be indexed * @param asglobal true adds the url to global crawl queue (for remote crawling), false to the local crawler */ - public void addToCrawler(final DigestURI url, final boolean asglobal) { - - if (this.index.exists(ASCII.String(url.hash()))) { - return; // don't do double-work - } - final Request request = this.loader.request(url, true, true); - final CrawlProfile profile = this.crawler.getActive(ASCII.getBytes(request.profileHandle())); - final String acceptedError = this.crawlStacker.checkAcceptance(url, profile, 0); - if (acceptedError != null) { - this.log.logInfo("addToCrawler: cannot load " - + url.toNormalform(true) - + ": " - + acceptedError); - return; - } - final String s; - if (asglobal) { - s = this.crawlQueues.noticeURL.push(StackType.GLOBAL, request, this.robots); - } else { - s = this.crawlQueues.noticeURL.push(StackType.LOCAL, request, this.robots); - } - - if (s != null) { - Switchboard.this.log.logInfo("addToCrawler: failed to add " - + url.toNormalform(true) - + ": " - + s); + public void addToCrawler(final Collection urls, final boolean asglobal) { + Map urlmap = new HashMap(); + for (DigestURI url: urls) urlmap.put(ASCII.String(url.hash()), url); + Set existingids = this.index.exists(urlmap.keySet()); + for (Map.Entry e: urlmap.entrySet()) { + if (existingids.contains(e.getKey())) continue; // double + DigestURI url = e.getValue(); + final Request request = this.loader.request(url, true, true); + final CrawlProfile profile = this.crawler.getActive(ASCII.getBytes(request.profileHandle())); + final String acceptedError = this.crawlStacker.checkAcceptance(url, profile, 0); + if (acceptedError != null) { + this.log.logInfo("addToCrawler: cannot load " + url.toNormalform(true) + ": " + acceptedError); + return; + } + final String s; + if (asglobal) { + s = this.crawlQueues.noticeURL.push(StackType.GLOBAL, request, this.robots); + } else { + s = this.crawlQueues.noticeURL.push(StackType.LOCAL, request, this.robots); + } + + if (s != null) { + Switchboard.this.log.logInfo("addToCrawler: failed to add " + url.toNormalform(true) + ": " + s); + } } } @@ -3413,14 +3416,16 @@ public final class Switchboard extends serverSwitch { if (links.size() < 1000) { // limit to 1000 to skip large index pages final Iterator i = links.keySet().iterator(); final boolean globalcrawljob = Switchboard.this.getConfigBool("heuristic.searchresults.crawlglobal",false); + Collection urls = new ArrayList(); while (i.hasNext()) { url = i.next(); boolean islocal = url.getHost().contentEquals(startUrl.getHost()); // add all external links or links to different page to crawler if ( !islocal ) {// || (!startUrl.getPath().endsWith(url.getPath()))) { - addToCrawler(url,globalcrawljob); + urls.add(url); } } + addToCrawler(urls, globalcrawljob); } } } catch (final Throwable e) { diff --git a/source/net/yacy/search/index/Fulltext.java b/source/net/yacy/search/index/Fulltext.java index 20f3ec3d2..b05512095 100644 --- a/source/net/yacy/search/index/Fulltext.java +++ b/source/net/yacy/search/index/Fulltext.java @@ -31,9 +31,11 @@ import java.net.MalformedURLException; import java.util.ArrayList; import java.util.Collection; import java.util.Date; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -691,6 +693,7 @@ public final class Fulltext { return false; } + @Deprecated public boolean exists(final String urlHash) { if (urlHash == null) return false; for (URIMetadataRow entry: this.pendingCollectionInputRows) { @@ -707,6 +710,29 @@ public final class Fulltext { if (this.urlIndexFile != null && this.urlIndexFile.has(ASCII.getBytes(urlHash))) return true; return false; } + + public Set exists(Collection ids) { + HashSet e = new HashSet(); + if (ids == null || ids.size() == 0) return e; + Collection idsC = new HashSet(); + for (String id: ids) { + for (URIMetadataRow entry: this.pendingCollectionInputRows) { + if (id.equals(ASCII.String(entry.hash()))) {e.add(id); continue;} + } + for (SolrInputDocument doc: this.pendingCollectionInputDocuments) { + if (id.equals(doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))) {e.add(id); continue;} + } + if (this.urlIndexFile != null && this.urlIndexFile.has(ASCII.getBytes(id))) {e.add(id); continue;} + idsC.add(id); + } + try { + Set e1 = this.getDefaultConnector().existsByIds(idsC); + e.addAll(e1); + } catch (final Throwable ee) { + Log.logException(ee); + } + return e; + } public String failReason(final String urlHash) throws IOException { if (urlHash == null) return null; diff --git a/source/net/yacy/search/index/Segment.java b/source/net/yacy/search/index/Segment.java index f8c451e02..b7baf841f 100644 --- a/source/net/yacy/search/index/Segment.java +++ b/source/net/yacy/search/index/Segment.java @@ -29,6 +29,7 @@ package net.yacy.search.index; import java.io.File; import java.io.IOException; import java.net.MalformedURLException; +import java.util.Collection; import java.util.Date; import java.util.Iterator; import java.util.Map; @@ -295,10 +296,15 @@ public class Segment { } } + @Deprecated public boolean exists(final String urlhash) { return this.fulltext.exists(urlhash); } + public Set exists(final Collection ids) { + return this.fulltext.exists(ids); + } + /** * discover all urls that start with a given url stub * @param stub diff --git a/source/net/yacy/search/query/SearchEvent.java b/source/net/yacy/search/query/SearchEvent.java index b561f0863..a6f6727aa 100644 --- a/source/net/yacy/search/query/SearchEvent.java +++ b/source/net/yacy/search/query/SearchEvent.java @@ -26,7 +26,6 @@ package net.yacy.search.query; -import java.text.ParseException; import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -42,7 +41,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import net.yacy.contentcontrol.ContentControlFilterUpdateThread; -import net.yacy.cora.date.GenericFormatter; import net.yacy.cora.document.ASCII; import net.yacy.cora.document.MultiProtocolURI; import net.yacy.cora.document.UTF8; @@ -98,6 +96,7 @@ public final class SearchEvent { private static final int max_results_rwi = 3000; + /* private static long noRobinsonLocalRWISearch = 0; static { try { @@ -105,6 +104,7 @@ public final class SearchEvent { } catch (ParseException e) { } } + */ public static Log log = new Log("SEARCH"); diff --git a/source/net/yacy/server/serverObjects.java b/source/net/yacy/server/serverObjects.java index cb42c355f..a148e323f 100644 --- a/source/net/yacy/server/serverObjects.java +++ b/source/net/yacy/server/serverObjects.java @@ -140,8 +140,10 @@ public class serverObjects implements Serializable, Cloneable { public List> entrySet() { List> set = new ArrayList>(this.map.getMap().size() * 2); - for (Map.Entry entry: this.map.getMap().entrySet()) { - for (String v: entry.getValue()) set.add(new AbstractMap.SimpleEntry(entry.getKey(), v)); + Set> mset = this.map.getMap().entrySet(); + for (Map.Entry entry: mset) { + String[] vlist = entry.getValue(); + for (String v: vlist) set.add(new AbstractMap.SimpleEntry(entry.getKey(), v)); } return set; }