changed search process: received indexes are now buffered and written to wordIndex after search

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@934 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 20 years ago
parent 91e676744e
commit 6260942590

@ -152,11 +152,11 @@ public class IndexControl_p {
plasmaWordIndexEntity index = null;
try {
index = switchboard.wordIndex.getEntity(keyhash, true);
Enumeration en = index.elements(true);
Iterator en = index.elements(true);
int i = 0;
urlx = new String[index.size()];
while (en.hasMoreElements()) {
urlx[i++] = ((plasmaWordIndexEntry) en.nextElement()).getUrlHash();
while (en.hasNext()) {
urlx[i++] = ((plasmaWordIndexEntry) en.next()).getUrlHash();
}
index.close();
index = null;
@ -260,13 +260,13 @@ public class IndexControl_p {
long starttime = System.currentTimeMillis();
indexes[0] = switchboard.wordIndex.getEntity(keyhash, true);
// built urlCache
Enumeration urlEnum = indexes[0].elements(true);
Iterator urlIter = indexes[0].elements(true);
HashMap knownURLs = new HashMap();
HashSet unknownURLEntries = new HashSet();
plasmaWordIndexEntry indexEntry;
plasmaCrawlLURL.Entry lurl;
while (urlEnum.hasMoreElements()) {
indexEntry = (plasmaWordIndexEntry) urlEnum.nextElement();
while (urlIter.hasNext()) {
indexEntry = (plasmaWordIndexEntry) urlIter.next();
lurl = switchboard.urlPool.loadedURL.getEntry(indexEntry.getUrlHash());
if (lurl == null) {
unknownURLEntries.add(indexEntry.getUrlHash());
@ -442,15 +442,15 @@ public class IndexControl_p {
if (index.size() == 0) {
result.append("No URL entries related to this word hash <span class=\"tt\">").append(keyhash).append("</span>.");
} else {
final Enumeration en = index.elements(true);
final Iterator en = index.elements(true);
result.append("URL entries related to this word hash <span class=\"tt\">").append(keyhash).append("</span><br><br>");
result.append("<form action=\"IndexControl_p.html\" method=\"post\" enctype=\"multipart/form-data\">");
String us, uh;
int i = 0;
final TreeMap tm = new TreeMap();
while (en.hasMoreElements()) {
uh = ((plasmaWordIndexEntry)en.nextElement()).getUrlHash();
while (en.hasNext()) {
uh = ((plasmaWordIndexEntry)en.next()).getUrlHash();
if (switchboard.urlPool.loadedURL.exists(uh)) {
us = switchboard.urlPool.loadedURL.getEntry(uh).url().toString();
tm.put(us, uh);

@ -52,6 +52,8 @@ import java.io.IOException;
import de.anomic.kelondro.kelondroException;
import de.anomic.server.logging.serverLog;
import de.anomic.server.serverCodings;
import de.anomic.server.serverInstantThread;
import de.anomic.yacy.yacySearch;
public final class plasmaSearchEvent {
@ -60,6 +62,8 @@ public final class plasmaSearchEvent {
private plasmaWordIndex wordIndex;
private plasmaCrawlLURL urlStore;
private plasmaSnippetCache snippetCache;
private plasmaWordIndexEntity rcLocal, rcGlobal; // caches for results
private yacySearch[] searchThreads;
public plasmaSearchEvent(plasmaSearchQuery query, serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL urlStore, plasmaSnippetCache snippetCache) {
this.log = log;
@ -67,9 +71,77 @@ public final class plasmaSearchEvent {
this.query = query;
this.urlStore = urlStore;
this.snippetCache = snippetCache;
this.rcLocal = new plasmaWordIndexEntity(null);
this.rcGlobal = new plasmaWordIndexEntity(null);
this.searchThreads = null;
}
public plasmaWordIndexEntity search(long time) throws IOException {
public plasmaSearchResult search() {
// combine all threads
if (query.domType == plasmaSearchQuery.SEARCHDOM_GLOBALDHT) {
int fetchcount = ((int) (query.maximumTime / 1000L)) * 5; // number of wanted results until break in search
int fetchpeers = ((int) (query.maximumTime / 1000L)) * 2; // number of target peers; means 30 peers in 10 seconds
long fetchtime = query.maximumTime * 6 / 10; // time to waste
// remember time
long start = System.currentTimeMillis();
// first trigger a local search within a separate thread
serverInstantThread.oneTimeJob(this, "localSearch", log, 0);
// do a global search
int globalContributions = globalSearch(fetchcount, fetchpeers, fetchtime);
log.logFine("SEARCH TIME AFTER GLOBAL-TRIGGER TO " + fetchpeers + " PEERS: " + ((System.currentTimeMillis() - start) / 1000) + " seconds");
try {
// combine the result and order
long remainingTime = query.maximumTime - (System.currentTimeMillis() - start);
if (remainingTime < 500) remainingTime = 500;
if (remainingTime > 3000) remainingTime = 3000;
plasmaSearchResult result = order(remainingTime, query.wantedResults);
result.globalContributions = globalContributions;
result.localContributions = rcLocal.size();
// flush results in a separate thread
serverInstantThread.oneTimeJob(this, "flushResults", log, 0);
// clean up
if ((rcLocal != null) && (!(rcLocal.isTMPEntity()))) rcLocal.close();
rcLocal = null;
// return search result
return result;
} catch (IOException e) {
return null;
}
} else {
// do a local search
long start = System.currentTimeMillis();
try {
localSearch(query.maximumTime);
plasmaSearchResult result = order(query.maximumTime - (System.currentTimeMillis() - start), query.wantedResults);
result.localContributions = rcLocal.size();
// clean up
if ((rcLocal != null) && (!(rcLocal.isTMPEntity()))) rcLocal.close();
rcLocal = null;
return result;
} catch (IOException e) {
return null;
}
}
}
public void localSearch() throws IOException {
// method called by a one-time
localSearch(query.maximumTime * 6 / 10);
}
public int localSearch(long time) throws IOException {
// search for the set of hashes and return an array of urlEntry elements
long stamp = System.currentTimeMillis();
@ -78,29 +150,66 @@ public final class plasmaSearchEvent {
Set entities = wordIndex.getEntities(query.queryHashes, true, true);
// since this is a conjunction we return an empty entity if any word is not known
if (entities == null) return new plasmaWordIndexEntity(null);
if (entities == null) {
rcLocal = new plasmaWordIndexEntity(null);
return 0;
}
// join the result
return plasmaWordIndexEntity.joinEntities(entities, time - (System.currentTimeMillis() - stamp));
long remainingTime = time - (System.currentTimeMillis() - stamp);
if (remainingTime < 1000) remainingTime = 1000;
rcLocal = plasmaWordIndexEntity.joinEntities(entities, remainingTime);
log.logFine("SEARCH TIME FOR FINDING " + rcLocal.size() + " ELEMENTS: " + ((System.currentTimeMillis() - stamp) / 1000) + " seconds");
return rcLocal.size();
}
public plasmaSearchResult order(plasmaWordIndexEntity searchResult, long maxTime, int minEntries) throws IOException {
// we collect the urlhashes from it and construct a List with urlEntry objects
// attention: if minEntries is too high, this method will not terminate within the maxTime
public int globalSearch(int fetchcount, int fetchpeers, long timelimit) {
// do global fetching
// the result of the fetch is then in the rcGlobal
if (fetchpeers < 10) fetchpeers = 10;
if (fetchcount > query.wantedResults * 10) fetchcount = query.wantedResults * 10;
// set a duetime for clients
long duetime = timelimit - 4000; // subtract network traffic overhead, guessed 4 seconds
if (duetime < 1000) { duetime = 1000; }
long timeout = System.currentTimeMillis() + timelimit;
searchThreads = yacySearch.searchHashes(query.queryHashes, urlStore, rcGlobal, fetchcount, fetchpeers, plasmaSwitchboard.urlBlacklist, snippetCache, duetime);
// wait until wanted delay passed or wanted result appeared
while (System.currentTimeMillis() < timeout) {
// check if all threads have been finished or results so far are enough
if (rcGlobal.size() >= fetchcount * 3) break; // we have enough
if (yacySearch.remainingWaiting(searchThreads) == 0) break; // we cannot expect more
// wait a little time ..
try {Thread.currentThread().sleep(100);} catch (InterruptedException e) {}
}
return rcGlobal.size();
}
public plasmaSearchResult order(long maxTime, int minEntries) throws IOException {
// we collect the urlhashes and construct a list with urlEntry objects
// attention: if minEntries is too high, this method will not terminate within the maxTime
plasmaWordIndexEntity searchResult = new plasmaWordIndexEntity(null);
searchResult.merge(rcLocal, -1);
searchResult.merge(rcGlobal, -1);
plasmaSearchResult acc = new plasmaSearchResult(query);
if (searchResult == null) return acc; // strange case where searchResult is not proper: acc is then empty
if (searchResult.size() == 0) return acc; // case that we have nothing to do
Enumeration e = searchResult.elements(true);
Iterator e = searchResult.elements(true);
plasmaWordIndexEntry entry;
long startCreateTime = System.currentTimeMillis();
plasmaCrawlLURL.Entry page;
try {
while (e.hasMoreElements()) {
while (e.hasNext()) {
if ((acc.sizeFetched() >= minEntries) &&
(System.currentTimeMillis() - startCreateTime >= maxTime)) break;
entry = (plasmaWordIndexEntry) e.nextElement();
entry = (plasmaWordIndexEntry) e.next();
// find the url entry
page = urlStore.getEntry(entry.getUrlHash());
// add a result
@ -111,10 +220,47 @@ public final class plasmaSearchEvent {
}
long startSortTime = System.currentTimeMillis();
acc.sortResults();
serverLog.logFine("PLASMA", "plasmaSearch.order: minEntries = " + minEntries + ", effectiveEntries = " + acc.sizeOrdered() + ", demanded Time = " + maxTime + ", effectiveTime = " + (System.currentTimeMillis() - startCreateTime) + ", createTime = " + (startSortTime - startCreateTime) + ", sortTime = " + (System.currentTimeMillis() - startSortTime));
serverLog.logFine("PLASMA", "plasmaSearchEvent.order: minEntries = " + minEntries + ", effectiveEntries = " + acc.sizeOrdered() + ", demanded Time = " + maxTime + ", effectiveTime = " + (System.currentTimeMillis() - startCreateTime) + ", createTime = " + (startSortTime - startCreateTime) + ", sortTime = " + (System.currentTimeMillis() - startSortTime));
return acc;
}
public void flushResults() {
// put all new results into wordIndex
// this must be called after search results had been computed
// it is wise to call this within a separate thread because this method waits untill all
if (searchThreads == null) return;
// wait untill all threads are finished
int remaining;
long starttime = System.currentTimeMillis();
while ((remaining = yacySearch.remainingWaiting(searchThreads)) > 0) {
try {Thread.currentThread().sleep(5000);} catch (InterruptedException e) {}
if (System.currentTimeMillis() - starttime > 90000) {
yacySearch.interruptAlive(searchThreads);
serverLog.logFine("PLASMA", "SEARCH FLUSH: " + remaining + " PEERS STILL BUSY; ABANDONED");
break;
}
}
// now flush the rcGlobal into wordIndex
Iterator hashi = query.queryHashes.iterator();
String wordHash;
while (hashi.hasNext()) {
wordHash = (String) hashi.next();
Iterator i = rcGlobal.elements(true);
plasmaWordIndexEntry entry;
while (i.hasNext()) {
entry = (plasmaWordIndexEntry) i.next();
wordIndex.addEntries(plasmaWordIndexEntryContainer.instantContainer(wordHash, System.currentTimeMillis(), entry), false);
}
}
serverLog.logFine("PLASMA", "FINISHED FLUSHING " + rcGlobal.size() + " GLOBAL SEARCH RESULTS");
// finally delete the temporary index
rcGlobal = null;
}
/*
public void preSearch() {
plasmaWordIndexEntity idx = null;

@ -60,12 +60,16 @@ public final class plasmaSearchResult {
private kelondroMScoreCluster ref; // reference score computation for the commonSense heuristic
private ArrayList results; // this is a buffer for plasmaWordIndexEntry + plasmaCrawlLURL.entry - objects
private plasmaSearchQuery query;
public int globalContributions;
public int localContributions;
public plasmaSearchResult(plasmaSearchQuery query) {
this.pageAcc = new TreeMap();
this.ref = new kelondroMScoreCluster();
this.results = new ArrayList();
this.query = query;
this.globalContributions = 0;
this.localContributions = 0;
}
public plasmaSearchResult cloneSmart() {

@ -1440,10 +1440,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
serverObjects prop = new serverObjects();
try {
//char[] order = new char[2];
//if (query.order[0].equals("quality")) order[0] = plasmaSearchResult.O_QUALITY; else order[0] = plasmaSearchResult.O_AGE;
//if (query.order[1].equals("quality")) order[1] = plasmaSearchResult.O_QUALITY; else order[1] = plasmaSearchResult.O_AGE;
// filter out words that appear in bluelist
query.filterOut(blueList);
@ -1458,30 +1454,11 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
// preselect.start();
//}
// do global fetching
int globalresults = 0;
if (query.domType == plasmaSearchQuery.SEARCHDOM_GLOBALDHT) {
int fetchcount = ((int) (query.maximumTime / 1000L)) * 5; // number of wanted results until break in search
int fetchpeers = ((int) (query.maximumTime / 1000L)) * 2; // number of target peers; means 30 peers in 10 seconds
long fetchtime = query.maximumTime * 6 / 10; // time to waste
if (fetchpeers < 10) fetchpeers = 10;
if (fetchcount > query.wantedResults * 10) fetchcount = query.wantedResults * 10;
globalresults = yacySearch.searchHashes(query.queryHashes, urlPool.loadedURL, wordIndex, fetchcount, fetchpeers, urlBlacklist, snippetCache, fetchtime);
log.logFine("SEARCH TIME AFTER GLOBAL-TRIGGER TO " + fetchpeers + " PEERS: " + ((System.currentTimeMillis() - timestamp) / 1000) + " seconds");
}
prop.put("globalresults", globalresults); // the result are written to the local DB
// now search locally (the global results should be now in the local db)
long remainingTime = query.maximumTime - (System.currentTimeMillis() - timestamp);
// create a new search event
plasmaSearchEvent theSearch = new plasmaSearchEvent(query, log, wordIndex, urlPool.loadedURL, snippetCache);
plasmaWordIndexEntity idx = theSearch.search(remainingTime * 8 / 10);
log.logFine("SEARCH TIME AFTER FINDING " + idx.size() + " ELEMENTS: " + ((System.currentTimeMillis() - timestamp) / 1000) + " seconds");
plasmaSearchResult acc = theSearch.search();
remainingTime = query.maximumTime - (System.currentTimeMillis() - timestamp);
if (remainingTime < 500) remainingTime = 500;
if (remainingTime > 3000) remainingTime = 3000;
plasmaSearchResult acc = theSearch.order(idx, remainingTime, 10);
// fetch snippets
if (query.domType != plasmaSearchQuery.SEARCHDOM_GLOBALDHT)
snippetCache.fetch(acc.cloneSmart(), query.queryHashes, query.urlMask, 10);
log.logFine("SEARCH TIME AFTER ORDERING OF SEARCH RESULT: " + ((System.currentTimeMillis() - timestamp) / 1000) + " seconds");
@ -1492,7 +1469,8 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
prop.put("orderedcount", "0");
prop.put("linkcount", "0");
} else {
prop.put("totalcount", Integer.toString(idx.size()));
prop.put("globalresults", acc.globalContributions);
prop.put("totalcount", acc.globalContributions + acc.localContributions);
prop.put("orderedcount", Integer.toString(acc.sizeOrdered()));
int i = 0;
int p;
@ -1564,7 +1542,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
log.logFine("SEARCH TIME AFTER RESULT PREPARATION: " + ((System.currentTimeMillis() - timestamp) / 1000) + " seconds");
// calc some more cross-reference
remainingTime = query.maximumTime - (System.currentTimeMillis() - timestamp);
long remainingTime = query.maximumTime - (System.currentTimeMillis() - timestamp);
if (remainingTime < 0) remainingTime = 1000;
/*
while ((acc.hasMoreElements()) && (((time + timestamp) < System.currentTimeMillis()))) {
@ -1596,7 +1574,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
prop.get("orderedcount", "0") + " links ordered, " +
prop.get("linkcount", "?") + " links selected, " +
((System.currentTimeMillis() - timestamp) / 1000) + " seconds");
if (idx != null) idx.close();
return prop;
} catch (IOException e) {
return null;
@ -1614,10 +1591,10 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
log.logInfo("INIT HASH SEARCH: " + query.queryHashes + " - " + query.wantedResults + " links");
long timestamp = System.currentTimeMillis();
plasmaSearchEvent theSearch = new plasmaSearchEvent(query, log, wordIndex, urlPool.loadedURL, snippetCache);
plasmaWordIndexEntity idx = theSearch.search(query.maximumTime * 8 / 10);
int idxc = theSearch.localSearch(query.maximumTime * 8 / 10);
long remainingTime = query.maximumTime - (System.currentTimeMillis() - timestamp);
if (remainingTime < 500) remainingTime = 500;
plasmaSearchResult acc = theSearch.order(idx, remainingTime, 10);
plasmaSearchResult acc = theSearch.order(remainingTime, 10);
// result is a List of urlEntry elements
if (acc == null) {
@ -1665,11 +1642,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
prop.put("fwrec", ""); // peers that would have helped to construct this result (recommendations)
// log
log.logInfo("EXIT HASH SEARCH: " + query.queryHashes + " - " +
((idx == null) ? "0" : (""+idx.size())) + " links found, " +
log.logInfo("EXIT HASH SEARCH: " + query.queryHashes + " - " + idxc + " links found, " +
prop.get("linkcount", "?") + " links selected, " +
((System.currentTimeMillis() - timestamp) / 1000) + " seconds");
if (idx != null) idx.close();
return prop;
} catch (IOException e) {
return null;

@ -570,10 +570,10 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
return -container.size();
} else {
// the combined container will fit, read the container
Enumeration entries = entity.elements(true);
Iterator entries = entity.elements(true);
plasmaWordIndexEntry entry;
while (entries.hasMoreElements()) {
entry = (plasmaWordIndexEntry) entries.nextElement();
while (entries.hasNext()) {
entry = (plasmaWordIndexEntry) entries.next();
container.add(new plasmaWordIndexEntry[]{entry}, System.currentTimeMillis());
}
// we have read all elements, now delete the entity

@ -301,7 +301,7 @@ public final class plasmaWordIndexDistribution {
int currOpenFiles = 0;
Iterator wordHashIterator = this.wordIndex.wordHashes(hash, true, true);
plasmaWordIndexEntity indexEntity, tmpEntity;
Enumeration urlEnum;
Iterator urlIter;
Iterator hashIter;
plasmaWordIndexEntry indexEntry;
plasmaCrawlLURL.Entry lurl;
@ -322,10 +322,10 @@ public final class plasmaWordIndexDistribution {
// take the whole entity
try {
// fist check if we know all urls
urlEnum = indexEntity.elements(true);
urlIter = indexEntity.elements(true);
unknownURLEntries.clear();
while (urlEnum.hasMoreElements()) {
indexEntry = (plasmaWordIndexEntry) urlEnum.nextElement();
while (urlIter.hasNext()) {
indexEntry = (plasmaWordIndexEntry) urlIter.next();
lurl = this.urlPool.loadedURL.getEntry(indexEntry.getUrlHash());
if ((lurl == null) || (lurl.toString() == null)) {
unknownURLEntries.add(indexEntry.getUrlHash());
@ -361,10 +361,10 @@ public final class plasmaWordIndexDistribution {
// make an on-the-fly entity and insert values
tmpEntity = new plasmaWordIndexEntity(indexEntity.wordHash());
try {
urlEnum = indexEntity.elements(true);
urlIter = indexEntity.elements(true);
unknownURLEntries.clear();
while ((urlEnum.hasMoreElements()) && (count > 0)) {
indexEntry = (plasmaWordIndexEntry) urlEnum.nextElement();
while ((urlIter.hasNext()) && (count > 0)) {
indexEntry = (plasmaWordIndexEntry) urlIter.next();
lurl = this.urlPool.loadedURL.getEntry(indexEntry.getUrlHash());
if (lurl == null) {
unknownURLEntries.add(indexEntry.getUrlHash());
@ -410,7 +410,7 @@ public final class plasmaWordIndexDistribution {
boolean deleteTransferIndexes(plasmaWordIndexEntity[] indexEntities) throws IOException {
String wordhash;
Enumeration urlEnum;
Iterator urlIter;
plasmaWordIndexEntry indexEntry;
plasmaWordIndexEntity indexEntity;
String[] urlHashes;
@ -421,9 +421,9 @@ public final class plasmaWordIndexDistribution {
// delete entries separately
int c = 0;
urlHashes = new String[indexEntities[i].size()];
urlEnum = indexEntities[i].elements(true);
while (urlEnum.hasMoreElements()) {
indexEntry = (plasmaWordIndexEntry) urlEnum.nextElement();
urlIter = indexEntities[i].elements(true);
while (urlIter.hasNext()) {
indexEntry = (plasmaWordIndexEntry) urlIter.next();
urlHashes[c++] = indexEntry.getUrlHash();
}
wordIndex.removeEntries(indexEntities[i].wordHash(), urlHashes, true);

@ -43,7 +43,6 @@ package de.anomic.plasma;
import java.io.File;
import java.io.IOException;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.TreeMap;
import java.util.Set;
@ -231,12 +230,12 @@ public final class plasmaWordIndexEntity {
}
}
public Enumeration elements(boolean up) {
public Iterator elements(boolean up) {
// returns an enumeration of plasmaWordIndexEntry objects
if (theTmpMap == null) return new dbenum(up); else return new tmpenum(up);
}
public final class dbenum implements Enumeration {
public final class dbenum implements Iterator {
Iterator i;
public dbenum(boolean up) {
try {
@ -247,10 +246,10 @@ public final class plasmaWordIndexEntity {
i = null;
}
}
public boolean hasMoreElements() {
public boolean hasNext() {
return (i != null) && (i.hasNext());
}
public Object nextElement() {
public Object next() {
if (i == null) return null;
try {
byte[][] n = ((kelondroRecords.Node) i.next()).getValues();
@ -263,22 +262,28 @@ public final class plasmaWordIndexEntity {
throw new RuntimeException("dbenum: " + e.getMessage());
}
}
public void remove() {
throw new UnsupportedOperationException();
}
}
public final class tmpenum implements Enumeration {
public final class tmpenum implements Iterator {
final TreeMap searchTree;
boolean up;
public tmpenum(boolean up) {
this.up = up;
searchTree = (TreeMap) theTmpMap.clone(); // a shallow clone that is destroyed during search
}
public boolean hasMoreElements() {
public boolean hasNext() {
return searchTree.size() > 0;
}
public Object nextElement() {
public Object next() {
Object urlHash = (up) ? searchTree.firstKey() : searchTree.lastKey();
plasmaWordIndexEntry entry = (plasmaWordIndexEntry) searchTree.remove(urlHash);
return entry;
}
public void remove() {
throw new UnsupportedOperationException();
}
}
public String toString() {
@ -294,6 +299,17 @@ public final class plasmaWordIndexEntity {
return l;
}
public void merge(plasmaWordIndexEntity otherEntity, long time) throws IOException {
// this is a merge of another entity to this entity
// the merge is interrupted when the given time is over
// a time=-1 means: no timeout
Iterator i = otherEntity.elements(true);
long timeout = (time == -1) ? Long.MAX_VALUE : System.currentTimeMillis() + time;
while ((i.hasNext()) && (System.currentTimeMillis() < timeout)) {
addEntry((plasmaWordIndexEntry) i.next());
}
}
public static plasmaWordIndexEntity joinEntities(Set entities, long time) throws IOException {
long stamp = System.currentTimeMillis();
@ -366,12 +382,12 @@ public final class plasmaWordIndexEntity {
private static plasmaWordIndexEntity joinConstructiveByTest(plasmaWordIndexEntity small, plasmaWordIndexEntity large, long time) throws IOException {
System.out.println("DEBUG: JOIN METHOD BY TEST");
plasmaWordIndexEntity conj = new plasmaWordIndexEntity(null); // start with empty search result
Enumeration se = small.elements(true);
Iterator se = small.elements(true);
plasmaWordIndexEntry ie;
long stamp = System.currentTimeMillis();
try {
while ((se.hasMoreElements()) && ((System.currentTimeMillis() - stamp) < time)) {
ie = (plasmaWordIndexEntry) se.nextElement();
while ((se.hasNext()) && ((System.currentTimeMillis() - stamp) < time)) {
ie = (plasmaWordIndexEntry) se.next();
if (large.contains(ie)) conj.addEntry(ie);
}
} catch (kelondroException e) {
@ -385,21 +401,21 @@ public final class plasmaWordIndexEntity {
private static plasmaWordIndexEntity joinConstructiveByEnumeration(plasmaWordIndexEntity i1, plasmaWordIndexEntity i2, long time) throws IOException {
System.out.println("DEBUG: JOIN METHOD BY ENUMERATION");
plasmaWordIndexEntity conj = new plasmaWordIndexEntity(null); // start with empty search result
Enumeration e1 = i1.elements(true);
Enumeration e2 = i2.elements(true);
Iterator e1 = i1.elements(true);
Iterator e2 = i2.elements(true);
int c;
if ((e1.hasMoreElements()) && (e2.hasMoreElements())) {
if ((e1.hasNext()) && (e2.hasNext())) {
plasmaWordIndexEntry ie1;
plasmaWordIndexEntry ie2;
try {
ie1 = (plasmaWordIndexEntry) e1.nextElement();
ie1 = (plasmaWordIndexEntry) e1.next();
} catch (kelondroException e) {
//serverLog.logSevere("PLASMA", "joinConstructiveByEnumeration: Database corrupt 1 (" + e.getMessage() + "), deleting index");
i1.deleteComplete();
return conj;
}
try {
ie2 = (plasmaWordIndexEntry) e2.nextElement();
ie2 = (plasmaWordIndexEntry) e2.next();
} catch (kelondroException e) {
//serverLog.logSevere("PLASMA", "joinConstructiveByEnumeration: Database corrupt 2 (" + e.getMessage() + "), deleting index");
i2.deleteComplete();
@ -410,7 +426,7 @@ public final class plasmaWordIndexEntity {
c = ie1.getUrlHash().compareTo(ie2.getUrlHash());
if (c < 0) {
try {
if (e1.hasMoreElements()) ie1 = (plasmaWordIndexEntry) e1.nextElement(); else break;
if (e1.hasNext()) ie1 = (plasmaWordIndexEntry) e1.next(); else break;
} catch (kelondroException e) {
//serverLog.logSevere("PLASMA", "joinConstructiveByEnumeration: Database 1 corrupt (" + e.getMessage() + "), deleting index");
i1.deleteComplete();
@ -418,7 +434,7 @@ public final class plasmaWordIndexEntity {
}
} else if (c > 0) {
try {
if (e2.hasMoreElements()) ie2 = (plasmaWordIndexEntry) e2.nextElement(); else break;
if (e2.hasNext()) ie2 = (plasmaWordIndexEntry) e2.next(); else break;
} catch (kelondroException e) {
//serverLog.logSevere("PLASMA", "joinConstructiveByEnumeration: Database 2 corrupt (" + e.getMessage() + "), deleting index");
i2.deleteComplete();
@ -428,14 +444,14 @@ public final class plasmaWordIndexEntity {
// we have found the same urls in different searches!
conj.addEntry(ie1);
try {
if (e1.hasMoreElements()) ie1 = (plasmaWordIndexEntry) e1.nextElement(); else break;
if (e1.hasNext()) ie1 = (plasmaWordIndexEntry) e1.next(); else break;
} catch (kelondroException e) {
//serverLog.logSevere("PLASMA", "joinConstructiveByEnumeration: Database 1 corrupt (" + e.getMessage() + "), deleting index");
i1.deleteComplete();
break;
}
try {
if (e2.hasMoreElements()) ie2 = (plasmaWordIndexEntry) e2.nextElement(); else break;
if (e2.hasNext()) ie2 = (plasmaWordIndexEntry) e2.next(); else break;
} catch (kelondroException e) {
//serverLog.logSevere("PLASMA", "joinConstructiveByEnumeration: Database 2 corrupt (" + e.getMessage() + "), deleting index");
i2.deleteComplete();

@ -49,6 +49,8 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import de.anomic.http.httpc;
import de.anomic.plasma.plasmaCrawlLURL;
import de.anomic.plasma.plasmaSnippetCache;
@ -270,11 +272,9 @@ public final class yacyClient {
}
}
public static int search(String wordhashes, int count, boolean global,
yacySeed targetPeer, plasmaCrawlLURL urlManager,
plasmaWordIndex wordIndex, plasmaURLPattern blacklist,
plasmaSnippetCache snippets,
long duetime) {
public static int search(String wordhashes, int count, boolean global, yacySeed targetPeer,
plasmaCrawlLURL urlManager, plasmaWordIndexEntity entityCache,
plasmaURLPattern blacklist, plasmaSnippetCache snippets, long duetime) {
// send a search request to peer with remote Hash
// this mainly converts the words into word hashes
@ -374,7 +374,7 @@ public final class yacyClient {
}
// finally insert the containers to the index
for (int m = 0; m < words; m++) { wordIndex.addEntries(container[m], true); }
for (int m = 0; m < words; m++) { entityCache.addEntries(container[m]); }
// generate statistics
long searchtime;
@ -383,7 +383,7 @@ public final class yacyClient {
} catch (NumberFormatException e) {
searchtime = totalrequesttime;
}
yacyCore.log.logFine("yacyClient.search: processed " + results + " links from peer " + targetPeer.hash + ", score=" + targetPeer.selectscore + ", DHTdist=" + yacyDHTAction.dhtDistance(targetPeer.hash, wordhashes) + ", duetime=" + duetime + ", searchtime=" + searchtime + ", netdelay=" + (totalrequesttime - searchtime) + ", references=" + result.get("references"));
yacyCore.log.logFine("yacyClient.search: processed " + results + " links from peer " + targetPeer.hash + ":" + targetPeer.getName() + ", score=" + targetPeer.selectscore + ", DHTdist=" + yacyDHTAction.dhtDistance(targetPeer.hash, wordhashes) + ", duetime=" + duetime + ", searchtime=" + searchtime + ", netdelay=" + (totalrequesttime - searchtime) + ", references=" + result.get("references"));
return results;
} catch (Exception e) {
yacyCore.log.logSevere("yacyClient.search error: '" + targetPeer.get("Name", "anonymous") + "' failed - " + e);
@ -596,12 +596,12 @@ public final class yacyClient {
post.put("wordc", Integer.toString(indexes.length));
int indexcount = 0;
final StringBuffer entrypost = new StringBuffer(indexes.length*73);
Enumeration eenum;
Iterator eenum;
plasmaWordIndexEntry entry;
for (int i = 0; i < indexes.length; i++) {
eenum = indexes[i].elements(true);
while (eenum.hasMoreElements()) {
entry = (plasmaWordIndexEntry) eenum.nextElement();
while (eenum.hasNext()) {
entry = (plasmaWordIndexEntry) eenum.next();
entrypost.append(indexes[i].wordHash())
.append(entry.toExternalForm())
.append(serverCore.crlfString);

@ -51,8 +51,8 @@ import java.util.HashMap;
import de.anomic.kelondro.kelondroMScoreCluster;
import de.anomic.plasma.plasmaCrawlLURL;
import de.anomic.plasma.plasmaURLPattern;
import de.anomic.plasma.plasmaWordIndex;
import de.anomic.plasma.plasmaSnippetCache;
import de.anomic.plasma.plasmaWordIndexEntity;
import de.anomic.server.logging.serverLog;
public class yacySearch extends Thread {
@ -61,7 +61,7 @@ public class yacySearch extends Thread {
final private int count;
final private boolean global;
final private plasmaCrawlLURL urlManager;
final private plasmaWordIndex wordIndex;
final private plasmaWordIndexEntity entityCache;
final private plasmaURLPattern blacklist;
final private plasmaSnippetCache snippetCache;
final private yacySeed targetPeer;
@ -69,13 +69,13 @@ public class yacySearch extends Thread {
final private long duetime;
public yacySearch(Set wordhashes, int count, boolean global, yacySeed targetPeer,
plasmaCrawlLURL urlManager, plasmaWordIndex wordIndex, plasmaURLPattern blacklist, plasmaSnippetCache snippetCache, long duetime) {
plasmaCrawlLURL urlManager, plasmaWordIndexEntity entityCache, plasmaURLPattern blacklist, plasmaSnippetCache snippetCache, long duetime) {
super("yacySearch_" + targetPeer.getName());
this.wordhashes = wordhashes;
this.count = count;
this.global = global;
this.urlManager = urlManager;
this.wordIndex = wordIndex;
this.entityCache = entityCache;
this.blacklist = blacklist;
this.snippetCache = snippetCache;
this.targetPeer = targetPeer;
@ -84,7 +84,7 @@ public class yacySearch extends Thread {
}
public void run() {
this.links = yacyClient.search(set2string(wordhashes), count, global, targetPeer, urlManager, wordIndex, blacklist, snippetCache, duetime);
this.links = yacyClient.search(set2string(wordhashes), count, global, targetPeer, urlManager, entityCache, blacklist, snippetCache, duetime);
if (links != 0) {
//yacyCore.log.logInfo("REMOTE SEARCH - remote peer " + targetPeer.hash + ":" + targetPeer.getName() + " contributed " + links + " links for word hash " + wordhashes);
yacyCore.seedDB.mySeed.incRI(links);
@ -165,65 +165,49 @@ public class yacySearch extends Thread {
return result;
}
public static int searchHashes(Set wordhashes, plasmaCrawlLURL urlManager, plasmaWordIndex wordIndex,
int count, int targets, plasmaURLPattern blacklist, plasmaSnippetCache snippetCache, long waitingtime) {
public static yacySearch[] searchHashes(Set wordhashes, plasmaCrawlLURL urlManager, plasmaWordIndexEntity entityCache,
int count, int targets, plasmaURLPattern blacklist, plasmaSnippetCache snippetCache, long duetime) {
// check own peer status
if (yacyCore.seedDB.mySeed == null || yacyCore.seedDB.mySeed.getAddress() == null) { return 0; }
// start delay control
final long start = System.currentTimeMillis();
// set a duetime for clients
long duetime = waitingtime - 4000; // subtract network traffic overhead, guessed 4 seconds
if (duetime < 1000) { duetime = 1000; }
if (yacyCore.seedDB.mySeed == null || yacyCore.seedDB.mySeed.getAddress() == null) { return null; }
// prepare seed targets and threads
//Set wordhashes = plasmaSearch.words2hashes(querywords);
final yacySeed[] targetPeers = selectPeers(wordhashes, targets);
if (targetPeers == null) { return 0; }
if (targetPeers == null) return null;
targets = targetPeers.length;
if (targets == 0) { return 0; }
if (targets == 0) return null;
yacySearch[] searchThreads = new yacySearch[targets];
for (int i = 0; i < targets; i++) {
searchThreads[i]= new yacySearch(wordhashes, count, true, targetPeers[i],
urlManager, wordIndex, blacklist, snippetCache, duetime);
urlManager, entityCache, blacklist, snippetCache, duetime);
searchThreads[i].start();
try {Thread.currentThread().sleep(20);} catch (InterruptedException e) {}
if ((System.currentTimeMillis() - start) > waitingtime) {
targets = i + 1;
break;
}
}
int c;
// wait until wanted delay passed or wanted result appeared
boolean anyIdle = true;
while ((anyIdle) && ((System.currentTimeMillis() - start) < waitingtime)) {
// check if all threads have been finished or results so far are enough
c = 0;
anyIdle = false;
for (int i = 0; i < targets; i++) {
if (searchThreads[i].links() < 0) {
anyIdle = true;
} else {
c = c + searchThreads[i].links();
}
}
if ((c >= count * 3) && ((System.currentTimeMillis() - start) > (waitingtime * 2 / 3))) {
yacyCore.log.logFine("DEBUG yacySearch: c=" + c + ", count=" + count + ", waitingtime=" + waitingtime);
break; // we have enough
}
if (c >= count * 5) { break; }
// wait a little time ..
try {Thread.currentThread().sleep(100);} catch (InterruptedException e) {}
}
// collect results
c = 0;
for (int i = 0; i < targets; i++) {
c = c + ((searchThreads[i].links() > 0) ? searchThreads[i].links() : 0);
return searchThreads;
}
public static int remainingWaiting(yacySearch[] searchThreads) {
int alive = 0;
for (int i = 0; i < searchThreads.length; i++) {
if (searchThreads[i].isAlive()) alive++;
}
return c;
return alive;
}
public static int collectedLinks(yacySearch[] searchThreads) {
int links = 0;
for (int i = 0; i < searchThreads.length; i++) {
if (!(searchThreads[i].isAlive())) links += searchThreads[i].links;
}
return links;
}
public static void interruptAlive(yacySearch[] searchThreads) {
for (int i = 0; i < searchThreads.length; i++) {
if (searchThreads[i].isAlive()) searchThreads[i].interrupt();
}
}
}

@ -750,16 +750,16 @@ public final class yacy {
plasmaWordIndexEntryContainer newContainer = new plasmaWordIndexEntryContainer(wordHash,importWordIdxEntity.size());
// the combined container will fit, read the container
Enumeration importWordIdxEntries = importWordIdxEntity.elements(true);
Iterator importWordIdxEntries = importWordIdxEntity.elements(true);
plasmaWordIndexEntry importWordIdxEntry;
while (importWordIdxEntries.hasMoreElements()) {
while (importWordIdxEntries.hasNext()) {
// testing if import process was aborted
if (Thread.interrupted()) break;
// getting next word index entry
entryCounter++;
importWordIdxEntry = (plasmaWordIndexEntry) importWordIdxEntries.nextElement();
importWordIdxEntry = (plasmaWordIndexEntry) importWordIdxEntries.next();
String urlHash = importWordIdxEntry.getUrlHash();
if ((importUrlDB.exists(urlHash)) && (!homeUrlDB.exists(urlHash))) {
urlCounter++;
@ -866,10 +866,10 @@ public final class yacy {
wordIdxEntity = wordIndex.getEntity(wordhash, true);
// the combined container will fit, read the container
Enumeration wordIdxEntries = wordIdxEntity.elements(true);
Iterator wordIdxEntries = wordIdxEntity.elements(true);
plasmaWordIndexEntry wordIdxEntry;
while (wordIdxEntries.hasMoreElements()) {
wordIdxEntry = (plasmaWordIndexEntry) wordIdxEntries.nextElement();
while (wordIdxEntries.hasNext()) {
wordIdxEntry = (plasmaWordIndexEntry) wordIdxEntries.next();
String urlHash = wordIdxEntry.getUrlHash();
if ((currentUrlDB.exists(urlHash)) && (!minimizedUrlDB.exists(urlHash))) {
urlCounter++;

Loading…
Cancel
Save