diff --git a/htroot/IndexControl_p.java b/htroot/IndexControl_p.java index dbe3951dd..1e7cf032b 100644 --- a/htroot/IndexControl_p.java +++ b/htroot/IndexControl_p.java @@ -152,11 +152,11 @@ public class IndexControl_p { plasmaWordIndexEntity index = null; try { index = switchboard.wordIndex.getEntity(keyhash, true); - Enumeration en = index.elements(true); + Iterator en = index.elements(true); int i = 0; urlx = new String[index.size()]; - while (en.hasMoreElements()) { - urlx[i++] = ((plasmaWordIndexEntry) en.nextElement()).getUrlHash(); + while (en.hasNext()) { + urlx[i++] = ((plasmaWordIndexEntry) en.next()).getUrlHash(); } index.close(); index = null; @@ -260,13 +260,13 @@ public class IndexControl_p { long starttime = System.currentTimeMillis(); indexes[0] = switchboard.wordIndex.getEntity(keyhash, true); // built urlCache - Enumeration urlEnum = indexes[0].elements(true); + Iterator urlIter = indexes[0].elements(true); HashMap knownURLs = new HashMap(); HashSet unknownURLEntries = new HashSet(); plasmaWordIndexEntry indexEntry; plasmaCrawlLURL.Entry lurl; - while (urlEnum.hasMoreElements()) { - indexEntry = (plasmaWordIndexEntry) urlEnum.nextElement(); + while (urlIter.hasNext()) { + indexEntry = (plasmaWordIndexEntry) urlIter.next(); lurl = switchboard.urlPool.loadedURL.getEntry(indexEntry.getUrlHash()); if (lurl == null) { unknownURLEntries.add(indexEntry.getUrlHash()); @@ -442,15 +442,15 @@ public class IndexControl_p { if (index.size() == 0) { result.append("No URL entries related to this word hash ").append(keyhash).append("."); } else { - final Enumeration en = index.elements(true); + final Iterator en = index.elements(true); result.append("URL entries related to this word hash ").append(keyhash).append("

"); result.append("
"); String us, uh; int i = 0; final TreeMap tm = new TreeMap(); - while (en.hasMoreElements()) { - uh = ((plasmaWordIndexEntry)en.nextElement()).getUrlHash(); + while (en.hasNext()) { + uh = ((plasmaWordIndexEntry)en.next()).getUrlHash(); if (switchboard.urlPool.loadedURL.exists(uh)) { us = switchboard.urlPool.loadedURL.getEntry(uh).url().toString(); tm.put(us, uh); diff --git a/source/de/anomic/plasma/plasmaSearchEvent.java b/source/de/anomic/plasma/plasmaSearchEvent.java index 46bd79a8d..984646ccd 100644 --- a/source/de/anomic/plasma/plasmaSearchEvent.java +++ b/source/de/anomic/plasma/plasmaSearchEvent.java @@ -52,6 +52,8 @@ import java.io.IOException; import de.anomic.kelondro.kelondroException; import de.anomic.server.logging.serverLog; import de.anomic.server.serverCodings; +import de.anomic.server.serverInstantThread; +import de.anomic.yacy.yacySearch; public final class plasmaSearchEvent { @@ -60,6 +62,8 @@ public final class plasmaSearchEvent { private plasmaWordIndex wordIndex; private plasmaCrawlLURL urlStore; private plasmaSnippetCache snippetCache; + private plasmaWordIndexEntity rcLocal, rcGlobal; // caches for results + private yacySearch[] searchThreads; public plasmaSearchEvent(plasmaSearchQuery query, serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL urlStore, plasmaSnippetCache snippetCache) { this.log = log; @@ -67,9 +71,77 @@ public final class plasmaSearchEvent { this.query = query; this.urlStore = urlStore; this.snippetCache = snippetCache; + this.rcLocal = new plasmaWordIndexEntity(null); + this.rcGlobal = new plasmaWordIndexEntity(null); + this.searchThreads = null; } - public plasmaWordIndexEntity search(long time) throws IOException { + public plasmaSearchResult search() { + // combine all threads + + if (query.domType == plasmaSearchQuery.SEARCHDOM_GLOBALDHT) { + int fetchcount = ((int) (query.maximumTime / 1000L)) * 5; // number of wanted results until break in search + int fetchpeers = ((int) (query.maximumTime / 1000L)) * 2; // number of target peers; means 30 peers in 10 seconds + long fetchtime = query.maximumTime * 6 / 10; // time to waste + + // remember time + long start = System.currentTimeMillis(); + + // first trigger a local search within a separate thread + serverInstantThread.oneTimeJob(this, "localSearch", log, 0); + + // do a global search + int globalContributions = globalSearch(fetchcount, fetchpeers, fetchtime); + log.logFine("SEARCH TIME AFTER GLOBAL-TRIGGER TO " + fetchpeers + " PEERS: " + ((System.currentTimeMillis() - start) / 1000) + " seconds"); + + try { + // combine the result and order + long remainingTime = query.maximumTime - (System.currentTimeMillis() - start); + if (remainingTime < 500) remainingTime = 500; + if (remainingTime > 3000) remainingTime = 3000; + + plasmaSearchResult result = order(remainingTime, query.wantedResults); + result.globalContributions = globalContributions; + result.localContributions = rcLocal.size(); + + // flush results in a separate thread + serverInstantThread.oneTimeJob(this, "flushResults", log, 0); + + // clean up + if ((rcLocal != null) && (!(rcLocal.isTMPEntity()))) rcLocal.close(); + rcLocal = null; + + // return search result + return result; + } catch (IOException e) { + return null; + } + } else { + // do a local search + long start = System.currentTimeMillis(); + try { + localSearch(query.maximumTime); + plasmaSearchResult result = order(query.maximumTime - (System.currentTimeMillis() - start), query.wantedResults); + result.localContributions = rcLocal.size(); + + // clean up + if ((rcLocal != null) && (!(rcLocal.isTMPEntity()))) rcLocal.close(); + rcLocal = null; + + return result; + } catch (IOException e) { + return null; + } + } + } + + + public void localSearch() throws IOException { + // method called by a one-time + localSearch(query.maximumTime * 6 / 10); + } + + public int localSearch(long time) throws IOException { // search for the set of hashes and return an array of urlEntry elements long stamp = System.currentTimeMillis(); @@ -78,29 +150,66 @@ public final class plasmaSearchEvent { Set entities = wordIndex.getEntities(query.queryHashes, true, true); // since this is a conjunction we return an empty entity if any word is not known - if (entities == null) return new plasmaWordIndexEntity(null); + if (entities == null) { + rcLocal = new plasmaWordIndexEntity(null); + return 0; + } // join the result - return plasmaWordIndexEntity.joinEntities(entities, time - (System.currentTimeMillis() - stamp)); + long remainingTime = time - (System.currentTimeMillis() - stamp); + if (remainingTime < 1000) remainingTime = 1000; + rcLocal = plasmaWordIndexEntity.joinEntities(entities, remainingTime); + log.logFine("SEARCH TIME FOR FINDING " + rcLocal.size() + " ELEMENTS: " + ((System.currentTimeMillis() - stamp) / 1000) + " seconds"); + + return rcLocal.size(); } - public plasmaSearchResult order(plasmaWordIndexEntity searchResult, long maxTime, int minEntries) throws IOException { - // we collect the urlhashes from it and construct a List with urlEntry objects - // attention: if minEntries is too high, this method will not terminate within the maxTime + public int globalSearch(int fetchcount, int fetchpeers, long timelimit) { + // do global fetching + // the result of the fetch is then in the rcGlobal + if (fetchpeers < 10) fetchpeers = 10; + if (fetchcount > query.wantedResults * 10) fetchcount = query.wantedResults * 10; + + // set a duetime for clients + long duetime = timelimit - 4000; // subtract network traffic overhead, guessed 4 seconds + if (duetime < 1000) { duetime = 1000; } + + long timeout = System.currentTimeMillis() + timelimit; + searchThreads = yacySearch.searchHashes(query.queryHashes, urlStore, rcGlobal, fetchcount, fetchpeers, plasmaSwitchboard.urlBlacklist, snippetCache, duetime); + + // wait until wanted delay passed or wanted result appeared + while (System.currentTimeMillis() < timeout) { + // check if all threads have been finished or results so far are enough + if (rcGlobal.size() >= fetchcount * 3) break; // we have enough + if (yacySearch.remainingWaiting(searchThreads) == 0) break; // we cannot expect more + // wait a little time .. + try {Thread.currentThread().sleep(100);} catch (InterruptedException e) {} + } + + return rcGlobal.size(); + } + + public plasmaSearchResult order(long maxTime, int minEntries) throws IOException { + // we collect the urlhashes and construct a list with urlEntry objects + // attention: if minEntries is too high, this method will not terminate within the maxTime + plasmaWordIndexEntity searchResult = new plasmaWordIndexEntity(null); + searchResult.merge(rcLocal, -1); + searchResult.merge(rcGlobal, -1); + plasmaSearchResult acc = new plasmaSearchResult(query); if (searchResult == null) return acc; // strange case where searchResult is not proper: acc is then empty if (searchResult.size() == 0) return acc; // case that we have nothing to do - Enumeration e = searchResult.elements(true); + Iterator e = searchResult.elements(true); plasmaWordIndexEntry entry; long startCreateTime = System.currentTimeMillis(); plasmaCrawlLURL.Entry page; try { - while (e.hasMoreElements()) { + while (e.hasNext()) { if ((acc.sizeFetched() >= minEntries) && (System.currentTimeMillis() - startCreateTime >= maxTime)) break; - entry = (plasmaWordIndexEntry) e.nextElement(); + entry = (plasmaWordIndexEntry) e.next(); // find the url entry page = urlStore.getEntry(entry.getUrlHash()); // add a result @@ -111,10 +220,47 @@ public final class plasmaSearchEvent { } long startSortTime = System.currentTimeMillis(); acc.sortResults(); - serverLog.logFine("PLASMA", "plasmaSearch.order: minEntries = " + minEntries + ", effectiveEntries = " + acc.sizeOrdered() + ", demanded Time = " + maxTime + ", effectiveTime = " + (System.currentTimeMillis() - startCreateTime) + ", createTime = " + (startSortTime - startCreateTime) + ", sortTime = " + (System.currentTimeMillis() - startSortTime)); + serverLog.logFine("PLASMA", "plasmaSearchEvent.order: minEntries = " + minEntries + ", effectiveEntries = " + acc.sizeOrdered() + ", demanded Time = " + maxTime + ", effectiveTime = " + (System.currentTimeMillis() - startCreateTime) + ", createTime = " + (startSortTime - startCreateTime) + ", sortTime = " + (System.currentTimeMillis() - startSortTime)); return acc; } + public void flushResults() { + // put all new results into wordIndex + // this must be called after search results had been computed + // it is wise to call this within a separate thread because this method waits untill all + if (searchThreads == null) return; + + // wait untill all threads are finished + int remaining; + long starttime = System.currentTimeMillis(); + while ((remaining = yacySearch.remainingWaiting(searchThreads)) > 0) { + try {Thread.currentThread().sleep(5000);} catch (InterruptedException e) {} + if (System.currentTimeMillis() - starttime > 90000) { + yacySearch.interruptAlive(searchThreads); + serverLog.logFine("PLASMA", "SEARCH FLUSH: " + remaining + " PEERS STILL BUSY; ABANDONED"); + break; + } + } + + // now flush the rcGlobal into wordIndex + Iterator hashi = query.queryHashes.iterator(); + String wordHash; + while (hashi.hasNext()) { + wordHash = (String) hashi.next(); + Iterator i = rcGlobal.elements(true); + plasmaWordIndexEntry entry; + while (i.hasNext()) { + entry = (plasmaWordIndexEntry) i.next(); + wordIndex.addEntries(plasmaWordIndexEntryContainer.instantContainer(wordHash, System.currentTimeMillis(), entry), false); + } + } + serverLog.logFine("PLASMA", "FINISHED FLUSHING " + rcGlobal.size() + " GLOBAL SEARCH RESULTS"); + + // finally delete the temporary index + rcGlobal = null; + } + + /* public void preSearch() { plasmaWordIndexEntity idx = null; diff --git a/source/de/anomic/plasma/plasmaSearchResult.java b/source/de/anomic/plasma/plasmaSearchResult.java index 142a19003..1b7b14e78 100644 --- a/source/de/anomic/plasma/plasmaSearchResult.java +++ b/source/de/anomic/plasma/plasmaSearchResult.java @@ -60,12 +60,16 @@ public final class plasmaSearchResult { private kelondroMScoreCluster ref; // reference score computation for the commonSense heuristic private ArrayList results; // this is a buffer for plasmaWordIndexEntry + plasmaCrawlLURL.entry - objects private plasmaSearchQuery query; + public int globalContributions; + public int localContributions; public plasmaSearchResult(plasmaSearchQuery query) { this.pageAcc = new TreeMap(); this.ref = new kelondroMScoreCluster(); this.results = new ArrayList(); this.query = query; + this.globalContributions = 0; + this.localContributions = 0; } public plasmaSearchResult cloneSmart() { diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 763407a9a..8d4b811e2 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -1440,10 +1440,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser serverObjects prop = new serverObjects(); try { - //char[] order = new char[2]; - //if (query.order[0].equals("quality")) order[0] = plasmaSearchResult.O_QUALITY; else order[0] = plasmaSearchResult.O_AGE; - //if (query.order[1].equals("quality")) order[1] = plasmaSearchResult.O_QUALITY; else order[1] = plasmaSearchResult.O_AGE; - // filter out words that appear in bluelist query.filterOut(blueList); @@ -1458,30 +1454,11 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser // preselect.start(); //} - // do global fetching - int globalresults = 0; - if (query.domType == plasmaSearchQuery.SEARCHDOM_GLOBALDHT) { - int fetchcount = ((int) (query.maximumTime / 1000L)) * 5; // number of wanted results until break in search - int fetchpeers = ((int) (query.maximumTime / 1000L)) * 2; // number of target peers; means 30 peers in 10 seconds - long fetchtime = query.maximumTime * 6 / 10; // time to waste - if (fetchpeers < 10) fetchpeers = 10; - if (fetchcount > query.wantedResults * 10) fetchcount = query.wantedResults * 10; - globalresults = yacySearch.searchHashes(query.queryHashes, urlPool.loadedURL, wordIndex, fetchcount, fetchpeers, urlBlacklist, snippetCache, fetchtime); - log.logFine("SEARCH TIME AFTER GLOBAL-TRIGGER TO " + fetchpeers + " PEERS: " + ((System.currentTimeMillis() - timestamp) / 1000) + " seconds"); - } - prop.put("globalresults", globalresults); // the result are written to the local DB - - - // now search locally (the global results should be now in the local db) - long remainingTime = query.maximumTime - (System.currentTimeMillis() - timestamp); + // create a new search event plasmaSearchEvent theSearch = new plasmaSearchEvent(query, log, wordIndex, urlPool.loadedURL, snippetCache); - plasmaWordIndexEntity idx = theSearch.search(remainingTime * 8 / 10); - log.logFine("SEARCH TIME AFTER FINDING " + idx.size() + " ELEMENTS: " + ((System.currentTimeMillis() - timestamp) / 1000) + " seconds"); + plasmaSearchResult acc = theSearch.search(); - remainingTime = query.maximumTime - (System.currentTimeMillis() - timestamp); - if (remainingTime < 500) remainingTime = 500; - if (remainingTime > 3000) remainingTime = 3000; - plasmaSearchResult acc = theSearch.order(idx, remainingTime, 10); + // fetch snippets if (query.domType != plasmaSearchQuery.SEARCHDOM_GLOBALDHT) snippetCache.fetch(acc.cloneSmart(), query.queryHashes, query.urlMask, 10); log.logFine("SEARCH TIME AFTER ORDERING OF SEARCH RESULT: " + ((System.currentTimeMillis() - timestamp) / 1000) + " seconds"); @@ -1492,7 +1469,8 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser prop.put("orderedcount", "0"); prop.put("linkcount", "0"); } else { - prop.put("totalcount", Integer.toString(idx.size())); + prop.put("globalresults", acc.globalContributions); + prop.put("totalcount", acc.globalContributions + acc.localContributions); prop.put("orderedcount", Integer.toString(acc.sizeOrdered())); int i = 0; int p; @@ -1564,7 +1542,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser log.logFine("SEARCH TIME AFTER RESULT PREPARATION: " + ((System.currentTimeMillis() - timestamp) / 1000) + " seconds"); // calc some more cross-reference - remainingTime = query.maximumTime - (System.currentTimeMillis() - timestamp); + long remainingTime = query.maximumTime - (System.currentTimeMillis() - timestamp); if (remainingTime < 0) remainingTime = 1000; /* while ((acc.hasMoreElements()) && (((time + timestamp) < System.currentTimeMillis()))) { @@ -1596,7 +1574,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser prop.get("orderedcount", "0") + " links ordered, " + prop.get("linkcount", "?") + " links selected, " + ((System.currentTimeMillis() - timestamp) / 1000) + " seconds"); - if (idx != null) idx.close(); return prop; } catch (IOException e) { return null; @@ -1614,10 +1591,10 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser log.logInfo("INIT HASH SEARCH: " + query.queryHashes + " - " + query.wantedResults + " links"); long timestamp = System.currentTimeMillis(); plasmaSearchEvent theSearch = new plasmaSearchEvent(query, log, wordIndex, urlPool.loadedURL, snippetCache); - plasmaWordIndexEntity idx = theSearch.search(query.maximumTime * 8 / 10); + int idxc = theSearch.localSearch(query.maximumTime * 8 / 10); long remainingTime = query.maximumTime - (System.currentTimeMillis() - timestamp); if (remainingTime < 500) remainingTime = 500; - plasmaSearchResult acc = theSearch.order(idx, remainingTime, 10); + plasmaSearchResult acc = theSearch.order(remainingTime, 10); // result is a List of urlEntry elements if (acc == null) { @@ -1665,11 +1642,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser prop.put("fwrec", ""); // peers that would have helped to construct this result (recommendations) // log - log.logInfo("EXIT HASH SEARCH: " + query.queryHashes + " - " + - ((idx == null) ? "0" : (""+idx.size())) + " links found, " + + log.logInfo("EXIT HASH SEARCH: " + query.queryHashes + " - " + idxc + " links found, " + prop.get("linkcount", "?") + " links selected, " + ((System.currentTimeMillis() - timestamp) / 1000) + " seconds"); - if (idx != null) idx.close(); return prop; } catch (IOException e) { return null; diff --git a/source/de/anomic/plasma/plasmaWordIndexCache.java b/source/de/anomic/plasma/plasmaWordIndexCache.java index d3daade4d..f7d1a27ba 100644 --- a/source/de/anomic/plasma/plasmaWordIndexCache.java +++ b/source/de/anomic/plasma/plasmaWordIndexCache.java @@ -570,10 +570,10 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { return -container.size(); } else { // the combined container will fit, read the container - Enumeration entries = entity.elements(true); + Iterator entries = entity.elements(true); plasmaWordIndexEntry entry; - while (entries.hasMoreElements()) { - entry = (plasmaWordIndexEntry) entries.nextElement(); + while (entries.hasNext()) { + entry = (plasmaWordIndexEntry) entries.next(); container.add(new plasmaWordIndexEntry[]{entry}, System.currentTimeMillis()); } // we have read all elements, now delete the entity diff --git a/source/de/anomic/plasma/plasmaWordIndexDistribution.java b/source/de/anomic/plasma/plasmaWordIndexDistribution.java index e9d5f58bc..9abbe1ef2 100644 --- a/source/de/anomic/plasma/plasmaWordIndexDistribution.java +++ b/source/de/anomic/plasma/plasmaWordIndexDistribution.java @@ -301,7 +301,7 @@ public final class plasmaWordIndexDistribution { int currOpenFiles = 0; Iterator wordHashIterator = this.wordIndex.wordHashes(hash, true, true); plasmaWordIndexEntity indexEntity, tmpEntity; - Enumeration urlEnum; + Iterator urlIter; Iterator hashIter; plasmaWordIndexEntry indexEntry; plasmaCrawlLURL.Entry lurl; @@ -322,10 +322,10 @@ public final class plasmaWordIndexDistribution { // take the whole entity try { // fist check if we know all urls - urlEnum = indexEntity.elements(true); + urlIter = indexEntity.elements(true); unknownURLEntries.clear(); - while (urlEnum.hasMoreElements()) { - indexEntry = (plasmaWordIndexEntry) urlEnum.nextElement(); + while (urlIter.hasNext()) { + indexEntry = (plasmaWordIndexEntry) urlIter.next(); lurl = this.urlPool.loadedURL.getEntry(indexEntry.getUrlHash()); if ((lurl == null) || (lurl.toString() == null)) { unknownURLEntries.add(indexEntry.getUrlHash()); @@ -361,10 +361,10 @@ public final class plasmaWordIndexDistribution { // make an on-the-fly entity and insert values tmpEntity = new plasmaWordIndexEntity(indexEntity.wordHash()); try { - urlEnum = indexEntity.elements(true); + urlIter = indexEntity.elements(true); unknownURLEntries.clear(); - while ((urlEnum.hasMoreElements()) && (count > 0)) { - indexEntry = (plasmaWordIndexEntry) urlEnum.nextElement(); + while ((urlIter.hasNext()) && (count > 0)) { + indexEntry = (plasmaWordIndexEntry) urlIter.next(); lurl = this.urlPool.loadedURL.getEntry(indexEntry.getUrlHash()); if (lurl == null) { unknownURLEntries.add(indexEntry.getUrlHash()); @@ -410,7 +410,7 @@ public final class plasmaWordIndexDistribution { boolean deleteTransferIndexes(plasmaWordIndexEntity[] indexEntities) throws IOException { String wordhash; - Enumeration urlEnum; + Iterator urlIter; plasmaWordIndexEntry indexEntry; plasmaWordIndexEntity indexEntity; String[] urlHashes; @@ -421,9 +421,9 @@ public final class plasmaWordIndexDistribution { // delete entries separately int c = 0; urlHashes = new String[indexEntities[i].size()]; - urlEnum = indexEntities[i].elements(true); - while (urlEnum.hasMoreElements()) { - indexEntry = (plasmaWordIndexEntry) urlEnum.nextElement(); + urlIter = indexEntities[i].elements(true); + while (urlIter.hasNext()) { + indexEntry = (plasmaWordIndexEntry) urlIter.next(); urlHashes[c++] = indexEntry.getUrlHash(); } wordIndex.removeEntries(indexEntities[i].wordHash(), urlHashes, true); diff --git a/source/de/anomic/plasma/plasmaWordIndexEntity.java b/source/de/anomic/plasma/plasmaWordIndexEntity.java index 45ac236b7..0947f9aba 100644 --- a/source/de/anomic/plasma/plasmaWordIndexEntity.java +++ b/source/de/anomic/plasma/plasmaWordIndexEntity.java @@ -43,7 +43,6 @@ package de.anomic.plasma; import java.io.File; import java.io.IOException; -import java.util.Enumeration; import java.util.Iterator; import java.util.TreeMap; import java.util.Set; @@ -231,12 +230,12 @@ public final class plasmaWordIndexEntity { } } - public Enumeration elements(boolean up) { + public Iterator elements(boolean up) { // returns an enumeration of plasmaWordIndexEntry objects if (theTmpMap == null) return new dbenum(up); else return new tmpenum(up); } - public final class dbenum implements Enumeration { + public final class dbenum implements Iterator { Iterator i; public dbenum(boolean up) { try { @@ -247,10 +246,10 @@ public final class plasmaWordIndexEntity { i = null; } } - public boolean hasMoreElements() { + public boolean hasNext() { return (i != null) && (i.hasNext()); } - public Object nextElement() { + public Object next() { if (i == null) return null; try { byte[][] n = ((kelondroRecords.Node) i.next()).getValues(); @@ -263,22 +262,28 @@ public final class plasmaWordIndexEntity { throw new RuntimeException("dbenum: " + e.getMessage()); } } + public void remove() { + throw new UnsupportedOperationException(); + } } - public final class tmpenum implements Enumeration { + public final class tmpenum implements Iterator { final TreeMap searchTree; boolean up; public tmpenum(boolean up) { this.up = up; searchTree = (TreeMap) theTmpMap.clone(); // a shallow clone that is destroyed during search } - public boolean hasMoreElements() { + public boolean hasNext() { return searchTree.size() > 0; } - public Object nextElement() { + public Object next() { Object urlHash = (up) ? searchTree.firstKey() : searchTree.lastKey(); plasmaWordIndexEntry entry = (plasmaWordIndexEntry) searchTree.remove(urlHash); return entry; } + public void remove() { + throw new UnsupportedOperationException(); + } } public String toString() { @@ -294,6 +299,17 @@ public final class plasmaWordIndexEntity { return l; } + public void merge(plasmaWordIndexEntity otherEntity, long time) throws IOException { + // this is a merge of another entity to this entity + // the merge is interrupted when the given time is over + // a time=-1 means: no timeout + Iterator i = otherEntity.elements(true); + long timeout = (time == -1) ? Long.MAX_VALUE : System.currentTimeMillis() + time; + while ((i.hasNext()) && (System.currentTimeMillis() < timeout)) { + addEntry((plasmaWordIndexEntry) i.next()); + } + } + public static plasmaWordIndexEntity joinEntities(Set entities, long time) throws IOException { long stamp = System.currentTimeMillis(); @@ -366,12 +382,12 @@ public final class plasmaWordIndexEntity { private static plasmaWordIndexEntity joinConstructiveByTest(plasmaWordIndexEntity small, plasmaWordIndexEntity large, long time) throws IOException { System.out.println("DEBUG: JOIN METHOD BY TEST"); plasmaWordIndexEntity conj = new plasmaWordIndexEntity(null); // start with empty search result - Enumeration se = small.elements(true); + Iterator se = small.elements(true); plasmaWordIndexEntry ie; long stamp = System.currentTimeMillis(); try { - while ((se.hasMoreElements()) && ((System.currentTimeMillis() - stamp) < time)) { - ie = (plasmaWordIndexEntry) se.nextElement(); + while ((se.hasNext()) && ((System.currentTimeMillis() - stamp) < time)) { + ie = (plasmaWordIndexEntry) se.next(); if (large.contains(ie)) conj.addEntry(ie); } } catch (kelondroException e) { @@ -385,21 +401,21 @@ public final class plasmaWordIndexEntity { private static plasmaWordIndexEntity joinConstructiveByEnumeration(plasmaWordIndexEntity i1, plasmaWordIndexEntity i2, long time) throws IOException { System.out.println("DEBUG: JOIN METHOD BY ENUMERATION"); plasmaWordIndexEntity conj = new plasmaWordIndexEntity(null); // start with empty search result - Enumeration e1 = i1.elements(true); - Enumeration e2 = i2.elements(true); + Iterator e1 = i1.elements(true); + Iterator e2 = i2.elements(true); int c; - if ((e1.hasMoreElements()) && (e2.hasMoreElements())) { + if ((e1.hasNext()) && (e2.hasNext())) { plasmaWordIndexEntry ie1; plasmaWordIndexEntry ie2; try { - ie1 = (plasmaWordIndexEntry) e1.nextElement(); + ie1 = (plasmaWordIndexEntry) e1.next(); } catch (kelondroException e) { //serverLog.logSevere("PLASMA", "joinConstructiveByEnumeration: Database corrupt 1 (" + e.getMessage() + "), deleting index"); i1.deleteComplete(); return conj; } try { - ie2 = (plasmaWordIndexEntry) e2.nextElement(); + ie2 = (plasmaWordIndexEntry) e2.next(); } catch (kelondroException e) { //serverLog.logSevere("PLASMA", "joinConstructiveByEnumeration: Database corrupt 2 (" + e.getMessage() + "), deleting index"); i2.deleteComplete(); @@ -410,7 +426,7 @@ public final class plasmaWordIndexEntity { c = ie1.getUrlHash().compareTo(ie2.getUrlHash()); if (c < 0) { try { - if (e1.hasMoreElements()) ie1 = (plasmaWordIndexEntry) e1.nextElement(); else break; + if (e1.hasNext()) ie1 = (plasmaWordIndexEntry) e1.next(); else break; } catch (kelondroException e) { //serverLog.logSevere("PLASMA", "joinConstructiveByEnumeration: Database 1 corrupt (" + e.getMessage() + "), deleting index"); i1.deleteComplete(); @@ -418,7 +434,7 @@ public final class plasmaWordIndexEntity { } } else if (c > 0) { try { - if (e2.hasMoreElements()) ie2 = (plasmaWordIndexEntry) e2.nextElement(); else break; + if (e2.hasNext()) ie2 = (plasmaWordIndexEntry) e2.next(); else break; } catch (kelondroException e) { //serverLog.logSevere("PLASMA", "joinConstructiveByEnumeration: Database 2 corrupt (" + e.getMessage() + "), deleting index"); i2.deleteComplete(); @@ -428,14 +444,14 @@ public final class plasmaWordIndexEntity { // we have found the same urls in different searches! conj.addEntry(ie1); try { - if (e1.hasMoreElements()) ie1 = (plasmaWordIndexEntry) e1.nextElement(); else break; + if (e1.hasNext()) ie1 = (plasmaWordIndexEntry) e1.next(); else break; } catch (kelondroException e) { //serverLog.logSevere("PLASMA", "joinConstructiveByEnumeration: Database 1 corrupt (" + e.getMessage() + "), deleting index"); i1.deleteComplete(); break; } try { - if (e2.hasMoreElements()) ie2 = (plasmaWordIndexEntry) e2.nextElement(); else break; + if (e2.hasNext()) ie2 = (plasmaWordIndexEntry) e2.next(); else break; } catch (kelondroException e) { //serverLog.logSevere("PLASMA", "joinConstructiveByEnumeration: Database 2 corrupt (" + e.getMessage() + "), deleting index"); i2.deleteComplete(); diff --git a/source/de/anomic/yacy/yacyClient.java b/source/de/anomic/yacy/yacyClient.java index 54b71a07e..f8a13d0df 100644 --- a/source/de/anomic/yacy/yacyClient.java +++ b/source/de/anomic/yacy/yacyClient.java @@ -49,6 +49,8 @@ import java.util.ArrayList; import java.util.Date; import java.util.Enumeration; import java.util.HashMap; +import java.util.Iterator; + import de.anomic.http.httpc; import de.anomic.plasma.plasmaCrawlLURL; import de.anomic.plasma.plasmaSnippetCache; @@ -270,11 +272,9 @@ public final class yacyClient { } } - public static int search(String wordhashes, int count, boolean global, - yacySeed targetPeer, plasmaCrawlLURL urlManager, - plasmaWordIndex wordIndex, plasmaURLPattern blacklist, - plasmaSnippetCache snippets, - long duetime) { + public static int search(String wordhashes, int count, boolean global, yacySeed targetPeer, + plasmaCrawlLURL urlManager, plasmaWordIndexEntity entityCache, + plasmaURLPattern blacklist, plasmaSnippetCache snippets, long duetime) { // send a search request to peer with remote Hash // this mainly converts the words into word hashes @@ -374,7 +374,7 @@ public final class yacyClient { } // finally insert the containers to the index - for (int m = 0; m < words; m++) { wordIndex.addEntries(container[m], true); } + for (int m = 0; m < words; m++) { entityCache.addEntries(container[m]); } // generate statistics long searchtime; @@ -383,7 +383,7 @@ public final class yacyClient { } catch (NumberFormatException e) { searchtime = totalrequesttime; } - yacyCore.log.logFine("yacyClient.search: processed " + results + " links from peer " + targetPeer.hash + ", score=" + targetPeer.selectscore + ", DHTdist=" + yacyDHTAction.dhtDistance(targetPeer.hash, wordhashes) + ", duetime=" + duetime + ", searchtime=" + searchtime + ", netdelay=" + (totalrequesttime - searchtime) + ", references=" + result.get("references")); + yacyCore.log.logFine("yacyClient.search: processed " + results + " links from peer " + targetPeer.hash + ":" + targetPeer.getName() + ", score=" + targetPeer.selectscore + ", DHTdist=" + yacyDHTAction.dhtDistance(targetPeer.hash, wordhashes) + ", duetime=" + duetime + ", searchtime=" + searchtime + ", netdelay=" + (totalrequesttime - searchtime) + ", references=" + result.get("references")); return results; } catch (Exception e) { yacyCore.log.logSevere("yacyClient.search error: '" + targetPeer.get("Name", "anonymous") + "' failed - " + e); @@ -596,12 +596,12 @@ public final class yacyClient { post.put("wordc", Integer.toString(indexes.length)); int indexcount = 0; final StringBuffer entrypost = new StringBuffer(indexes.length*73); - Enumeration eenum; + Iterator eenum; plasmaWordIndexEntry entry; for (int i = 0; i < indexes.length; i++) { eenum = indexes[i].elements(true); - while (eenum.hasMoreElements()) { - entry = (plasmaWordIndexEntry) eenum.nextElement(); + while (eenum.hasNext()) { + entry = (plasmaWordIndexEntry) eenum.next(); entrypost.append(indexes[i].wordHash()) .append(entry.toExternalForm()) .append(serverCore.crlfString); diff --git a/source/de/anomic/yacy/yacySearch.java b/source/de/anomic/yacy/yacySearch.java index 64ea54baf..3f59e053a 100644 --- a/source/de/anomic/yacy/yacySearch.java +++ b/source/de/anomic/yacy/yacySearch.java @@ -51,8 +51,8 @@ import java.util.HashMap; import de.anomic.kelondro.kelondroMScoreCluster; import de.anomic.plasma.plasmaCrawlLURL; import de.anomic.plasma.plasmaURLPattern; -import de.anomic.plasma.plasmaWordIndex; import de.anomic.plasma.plasmaSnippetCache; +import de.anomic.plasma.plasmaWordIndexEntity; import de.anomic.server.logging.serverLog; public class yacySearch extends Thread { @@ -61,7 +61,7 @@ public class yacySearch extends Thread { final private int count; final private boolean global; final private plasmaCrawlLURL urlManager; - final private plasmaWordIndex wordIndex; + final private plasmaWordIndexEntity entityCache; final private plasmaURLPattern blacklist; final private plasmaSnippetCache snippetCache; final private yacySeed targetPeer; @@ -69,13 +69,13 @@ public class yacySearch extends Thread { final private long duetime; public yacySearch(Set wordhashes, int count, boolean global, yacySeed targetPeer, - plasmaCrawlLURL urlManager, plasmaWordIndex wordIndex, plasmaURLPattern blacklist, plasmaSnippetCache snippetCache, long duetime) { + plasmaCrawlLURL urlManager, plasmaWordIndexEntity entityCache, plasmaURLPattern blacklist, plasmaSnippetCache snippetCache, long duetime) { super("yacySearch_" + targetPeer.getName()); this.wordhashes = wordhashes; this.count = count; this.global = global; this.urlManager = urlManager; - this.wordIndex = wordIndex; + this.entityCache = entityCache; this.blacklist = blacklist; this.snippetCache = snippetCache; this.targetPeer = targetPeer; @@ -84,7 +84,7 @@ public class yacySearch extends Thread { } public void run() { - this.links = yacyClient.search(set2string(wordhashes), count, global, targetPeer, urlManager, wordIndex, blacklist, snippetCache, duetime); + this.links = yacyClient.search(set2string(wordhashes), count, global, targetPeer, urlManager, entityCache, blacklist, snippetCache, duetime); if (links != 0) { //yacyCore.log.logInfo("REMOTE SEARCH - remote peer " + targetPeer.hash + ":" + targetPeer.getName() + " contributed " + links + " links for word hash " + wordhashes); yacyCore.seedDB.mySeed.incRI(links); @@ -165,65 +165,49 @@ public class yacySearch extends Thread { return result; } - public static int searchHashes(Set wordhashes, plasmaCrawlLURL urlManager, plasmaWordIndex wordIndex, - int count, int targets, plasmaURLPattern blacklist, plasmaSnippetCache snippetCache, long waitingtime) { + public static yacySearch[] searchHashes(Set wordhashes, plasmaCrawlLURL urlManager, plasmaWordIndexEntity entityCache, + int count, int targets, plasmaURLPattern blacklist, plasmaSnippetCache snippetCache, long duetime) { // check own peer status - if (yacyCore.seedDB.mySeed == null || yacyCore.seedDB.mySeed.getAddress() == null) { return 0; } - - // start delay control - final long start = System.currentTimeMillis(); - - // set a duetime for clients - long duetime = waitingtime - 4000; // subtract network traffic overhead, guessed 4 seconds - if (duetime < 1000) { duetime = 1000; } + if (yacyCore.seedDB.mySeed == null || yacyCore.seedDB.mySeed.getAddress() == null) { return null; } // prepare seed targets and threads //Set wordhashes = plasmaSearch.words2hashes(querywords); final yacySeed[] targetPeers = selectPeers(wordhashes, targets); - if (targetPeers == null) { return 0; } + if (targetPeers == null) return null; targets = targetPeers.length; - if (targets == 0) { return 0; } + if (targets == 0) return null; yacySearch[] searchThreads = new yacySearch[targets]; for (int i = 0; i < targets; i++) { searchThreads[i]= new yacySearch(wordhashes, count, true, targetPeers[i], - urlManager, wordIndex, blacklist, snippetCache, duetime); + urlManager, entityCache, blacklist, snippetCache, duetime); searchThreads[i].start(); try {Thread.currentThread().sleep(20);} catch (InterruptedException e) {} - if ((System.currentTimeMillis() - start) > waitingtime) { - targets = i + 1; - break; - } - } - int c; - // wait until wanted delay passed or wanted result appeared - boolean anyIdle = true; - while ((anyIdle) && ((System.currentTimeMillis() - start) < waitingtime)) { - // check if all threads have been finished or results so far are enough - c = 0; - anyIdle = false; - for (int i = 0; i < targets; i++) { - if (searchThreads[i].links() < 0) { - anyIdle = true; - } else { - c = c + searchThreads[i].links(); - } - } - if ((c >= count * 3) && ((System.currentTimeMillis() - start) > (waitingtime * 2 / 3))) { - yacyCore.log.logFine("DEBUG yacySearch: c=" + c + ", count=" + count + ", waitingtime=" + waitingtime); - break; // we have enough - } - if (c >= count * 5) { break; } - // wait a little time .. - try {Thread.currentThread().sleep(100);} catch (InterruptedException e) {} } - - // collect results - c = 0; - for (int i = 0; i < targets; i++) { - c = c + ((searchThreads[i].links() > 0) ? searchThreads[i].links() : 0); + return searchThreads; + } + + public static int remainingWaiting(yacySearch[] searchThreads) { + int alive = 0; + for (int i = 0; i < searchThreads.length; i++) { + if (searchThreads[i].isAlive()) alive++; } - return c; + return alive; } - + + public static int collectedLinks(yacySearch[] searchThreads) { + int links = 0; + for (int i = 0; i < searchThreads.length; i++) { + if (!(searchThreads[i].isAlive())) links += searchThreads[i].links; + } + return links; + } + + public static void interruptAlive(yacySearch[] searchThreads) { + for (int i = 0; i < searchThreads.length; i++) { + if (searchThreads[i].isAlive()) searchThreads[i].interrupt(); + } + } + + } diff --git a/source/yacy.java b/source/yacy.java index 1c5f8d329..9b4831d05 100644 --- a/source/yacy.java +++ b/source/yacy.java @@ -750,16 +750,16 @@ public final class yacy { plasmaWordIndexEntryContainer newContainer = new plasmaWordIndexEntryContainer(wordHash,importWordIdxEntity.size()); // the combined container will fit, read the container - Enumeration importWordIdxEntries = importWordIdxEntity.elements(true); + Iterator importWordIdxEntries = importWordIdxEntity.elements(true); plasmaWordIndexEntry importWordIdxEntry; - while (importWordIdxEntries.hasMoreElements()) { + while (importWordIdxEntries.hasNext()) { // testing if import process was aborted if (Thread.interrupted()) break; // getting next word index entry entryCounter++; - importWordIdxEntry = (plasmaWordIndexEntry) importWordIdxEntries.nextElement(); + importWordIdxEntry = (plasmaWordIndexEntry) importWordIdxEntries.next(); String urlHash = importWordIdxEntry.getUrlHash(); if ((importUrlDB.exists(urlHash)) && (!homeUrlDB.exists(urlHash))) { urlCounter++; @@ -866,10 +866,10 @@ public final class yacy { wordIdxEntity = wordIndex.getEntity(wordhash, true); // the combined container will fit, read the container - Enumeration wordIdxEntries = wordIdxEntity.elements(true); + Iterator wordIdxEntries = wordIdxEntity.elements(true); plasmaWordIndexEntry wordIdxEntry; - while (wordIdxEntries.hasMoreElements()) { - wordIdxEntry = (plasmaWordIndexEntry) wordIdxEntries.nextElement(); + while (wordIdxEntries.hasNext()) { + wordIdxEntry = (plasmaWordIndexEntry) wordIdxEntries.next(); String urlHash = wordIdxEntry.getUrlHash(); if ((currentUrlDB.exists(urlHash)) && (!minimizedUrlDB.exists(urlHash))) { urlCounter++;