removed synchronization and thread blockings

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@63 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 20 years ago
parent 3756e6d20f
commit e25f2354c2

@ -45,7 +45,7 @@
# Contributions and changes to the program code must be marked as such.
# define variables
version='0.366'
version='0.367'
datestr=`date +%Y%m%d`
#release='yacy_v'$version'_'$datestr
release='yacy_dev_v'$version'_'$datestr

@ -284,7 +284,7 @@ public class plasmaCrawlLURL extends plasmaURL {
if (urle != null) try {
initiatorSeed = yacyCore.seedDB.getConnected(initiatorHash);
executorSeed = yacyCore.seedDB.getConnected(executorHash);
cachepath = urle.url().toString().substring(7);
cachepath = (urle.url() == null) ? "-not-cached-" : urle.url().toString().substring(7);
if (cachepath.endsWith("/")) cachepath = cachepath + "ndx";
prop.put("table_indexed_" + c + "_dark", (dark) ? 1 : 0);
prop.put("table_indexed_" + c + "_feedbackpage", feedbackpage);

@ -104,7 +104,7 @@ public final class plasmaCrawlWorker extends Thread {
this.log = log;
}
public synchronized void execute(plasmaCrawlLoaderMessage theMsg) {
public void execute(plasmaCrawlLoaderMessage theMsg) {
this.theMsg = theMsg;
this.url = theMsg.url;

@ -73,17 +73,19 @@ public class plasmaSearch {
return (int) ((modified.getTime() / 86400000) % 262144);
}
public synchronized void addWordIndex(URL url, String urlHash, Date urlModified, int quality, String wordHash, int wordCount, int posintext, int posinphrase, int posofphraseint, String language, char doctype, boolean local) {
public void addWordIndex(URL url, String urlHash, Date urlModified, int quality, String wordHash, int wordCount, int posintext, int posinphrase, int posofphraseint, String language, char doctype, boolean local) {
// this is called by the remote search procedure when a new index arrives from remote
plasmaWordIndexEntry entry = new plasmaWordIndexEntry(urlHash, wordCount, posintext, posinphrase, posofphraseint,
calcVirtualAge(urlModified), quality, language, doctype, local);
plasmaWordIndexEntry entry = new plasmaWordIndexEntry(urlHash, wordCount,
posintext, posinphrase, posofphraseint,
calcVirtualAge(urlModified), quality,
language, doctype, local);
try {
wordIndex.addEntry(wordHash, entry);
} catch (IOException e) {}
// System.out.println("* received one index entry for URL: " + url); // debug
}
public synchronized int addPageIndex(URL url, String urlHash, Date urlModified, plasmaCondenser condenser,
public int addPageIndex(URL url, String urlHash, Date urlModified, plasmaCondenser condenser,
String language, char doctype) {
// this is called by the switchboard to put in a new page into the index
// use all the words in one condenser object to simultanous create index entries
@ -132,12 +134,12 @@ public class plasmaSearch {
return hashes;
}
public synchronized plasmaWordIndexEntity searchWords(Set words, long time) throws IOException {
public plasmaWordIndexEntity searchWords(Set words, long time) throws IOException {
// search for the set of words and return an array of urlEntry elements
return searchHashes(words2hashes(words), time);
}
public synchronized plasmaWordIndexEntity searchHashes(Set hashes, long time) throws IOException {
public plasmaWordIndexEntity searchHashes(Set hashes, long time) throws IOException {
// search for the set of hashes and return an array of urlEntry elements
long stamp = System.currentTimeMillis();
@ -184,7 +186,7 @@ public class plasmaSearch {
return l;
}
private synchronized plasmaWordIndexEntity joinConstructive(plasmaWordIndexEntity i1, plasmaWordIndexEntity i2, long time) throws IOException {
private plasmaWordIndexEntity joinConstructive(plasmaWordIndexEntity i1, plasmaWordIndexEntity i2, long time) throws IOException {
if ((i1 == null) || (i2 == null)) return null;
if ((i1.size() == 0) || (i2.size() == 0)) return new plasmaWordIndexEntity(null);
@ -205,7 +207,7 @@ public class plasmaSearch {
}
}
private synchronized plasmaWordIndexEntity joinConstructiveByTest(plasmaWordIndexEntity small, plasmaWordIndexEntity large, long time) throws IOException {
private plasmaWordIndexEntity joinConstructiveByTest(plasmaWordIndexEntity small, plasmaWordIndexEntity large, long time) throws IOException {
System.out.println("DEBUG: JOIN METHOD BY TEST");
plasmaWordIndexEntity conj = new plasmaWordIndexEntity(null); // start with empty search result
Enumeration se = small.elements(true);
@ -224,7 +226,7 @@ public class plasmaSearch {
return conj;
}
private synchronized plasmaWordIndexEntity joinConstructiveByEnumeration(plasmaWordIndexEntity i1, plasmaWordIndexEntity i2, long time) throws IOException {
private plasmaWordIndexEntity joinConstructiveByEnumeration(plasmaWordIndexEntity i1, plasmaWordIndexEntity i2, long time) throws IOException {
System.out.println("DEBUG: JOIN METHOD BY ENUMERATION");
plasmaWordIndexEntity conj = new plasmaWordIndexEntity(null); // start with empty search result
Enumeration e1 = i1.elements(true);
@ -289,34 +291,35 @@ public class plasmaSearch {
return conj;
}
public synchronized plasmaSearch.result order(plasmaWordIndexEntity searchResult, Set searchhashes, Set stopwords, char[] priority, long maxTime, int minEntries) throws IOException {
public plasmaSearch.result order(plasmaWordIndexEntity searchResult, Set searchhashes, Set stopwords, char[] priority, long maxTime, int minEntries) throws IOException {
// we collect the urlhashes from it and construct a List with urlEntry objects
plasmaSearch.result acc = new result(searchhashes, stopwords, priority);
// attention: if minEntries is too high, this method will not terminate within the maxTime
plasmaSearch.result acc = new result(searchhashes, stopwords, priority);
if (searchResult == null) return acc; // strange case where searchResult is not proper: acc is then empty
Enumeration e = searchResult.elements(true);
plasmaWordIndexEntry entry;
long startTime = System.currentTimeMillis();
//String headline;
while ((e.hasMoreElements()) && ((acc.sizeFetched() < minEntries) || (System.currentTimeMillis() - startTime < maxTime))) {
long startCreateTime = System.currentTimeMillis();
while ((e.hasMoreElements()) &&
((acc.sizeFetched() < minEntries) || (System.currentTimeMillis() - startCreateTime < maxTime))) {
entry = (plasmaWordIndexEntry) e.nextElement();
//headline = entry.
acc.addResult(entry);
}
long startSortTime = System.currentTimeMillis();
acc.sortResults();
System.out.println("plasmaSearch.order: minEntries = " + minEntries + ", effectiveEntries = " + acc.sizeOrdered() + ", demanded Time = " + maxTime + ", effectiveTime = " + (System.currentTimeMillis() - startTime));
System.out.println("plasmaSearch.order: minEntries = " + minEntries + ", effectiveEntries = " + acc.sizeOrdered() + ", demanded Time = " + maxTime + ", effectiveTime = " + (System.currentTimeMillis() - startCreateTime) + ", createTime = " + (startSortTime - startCreateTime) + ", sortTime = " + (System.currentTimeMillis() - startSortTime));
return acc;
}
public class result /*implements Enumeration*/ {
TreeMap pageAcc; // key = order hash; value = plasmaLURL.entry
kelondroMScoreCluster ref;
Set searchhashes;
Set stopwords;
char[] order;
ArrayList results;
TreeMap pageAcc; // key = order hash; value = plasmaLURL.entry
kelondroMScoreCluster ref; // reference score computation for the commonSense heuristic
Set searchhashes; // hashes that are searched here
Set stopwords; // words that are excluded from the commonSense heuristic
char[] order; // order of heuristics
ArrayList results; // this is a buffer for plasmaWordIndexEntry + plasmaCrawlLURL.entry - objects
public result(Set searchhashes, Set stopwords, char[] order) {
this.pageAcc = new TreeMap();
@ -343,49 +346,7 @@ public class plasmaSearch {
Object top = pageAcc.lastKey();
return (plasmaCrawlLURL.entry) pageAcc.remove(top);
}
/*
protected void putElement(plasmaWordIndexEntry indexEntry) {
// find the url entry
plasmaCrawlLURL.entry page = urlStore.getEntry(indexEntry.getUrlHash());
// check if the url exists; the url may not exist in case it was deleted
// somewhere else (i.e. manually through interface etc.)
if (page == null) return;
URL url = page.url();
String descr = page.descr();
if ((url == null) || (descr == null)) return;
// apply pre-calculated order attributes
long ranking = 0;
if (order[0] == O_QUALITY) ranking = 4096 * indexEntry.getQuality();
else if (order[0] == O_AGE) ranking = 4096 * indexEntry.getVirtualAge();
if (order[1] == O_QUALITY) ranking += indexEntry.getQuality();
else if (order[1] == O_AGE) ranking += indexEntry.getVirtualAge();
// apply query-in-result matching
long inc = 4096 * 4096;
String[] urlcomps = url.toString().split(splitrex);
//printSplitLog(url.toString(), urlcomps);
Set urlcomph = words2hashes(urlcomps);
String[] descrcomps = descr.split(splitrex);
//printSplitLog(descr, descrcomps);
Set descrcomph = words2hashes(descrcomps);
Iterator i = searchhashes.iterator();
String queryhash;
while (i.hasNext()) {
queryhash = (String) i.next();
if (urlcomph.contains(queryhash)) ranking += inc;
if (descrcomph.contains(queryhash)) ranking += 10 * inc;
}
// insert value
//System.out.println("Ranking " + ranking + " for url " + url.toString());
pageAcc.put(serverCodings.encodeHex(ranking, 16) + indexEntry.getUrlHash(), page);
addScoreFiltered(urlcomps);
addScoreFiltered(descrcomps);
}
*/
protected void addResult(plasmaWordIndexEntry indexEntry) {
// this does 3 things:
// 1. simply store indexEntry and page to a cache
@ -399,12 +360,8 @@ public class plasmaSearch {
URL url = page.url();
String descr = page.descr();
if ((url == null) || (descr == null)) return;
String[] urlcomps = url.toString().split(splitrex);
//printSplitLog(url.toString(), urlcomps);
String[] descrcomps = descr.split(splitrex);
//printSplitLog(descr, descrcomps);
String[] urlcomps = url.toString().split(splitrex); // word components of the url
String[] descrcomps = descr.split(splitrex); // words in the description
// store everything
Object[] resultVector = new Object[] {indexEntry, page, urlcomps, descrcomps};
@ -416,6 +373,10 @@ public class plasmaSearch {
}
protected void sortResults() {
// finally sort the results
// create a commonSense - set that represents a set of words that is
// treated as 'typical' for this search request
Object[] references = getReferences(16);
Set commonSense = new HashSet();
for (int i = 0; i < references.length; i++) commonSense.add((String) references[i]);
@ -461,9 +422,13 @@ public class plasmaSearch {
//System.out.println("Ranking " + ranking + " for url " + url.toString());
pageAcc.put(serverCodings.encodeHex(ranking, 16) + indexEntry.getUrlHash(), page);
}
// flush memory
results = null;
}
public Object[] getReferences(int count) {
// create a list of words that had been computed by statistics over all
// words that appeared in the url or the description of all urls
return ref.getScores(count, false, 2, Integer.MAX_VALUE);
}

@ -408,18 +408,23 @@ public class plasmaSwitchboard extends serverAbstractSwitch implements serverSwi
processStack.addLast(entry);
}
public synchronized boolean deQueue() {
if (serverJobs < 6) {
if (processStack.size() > 0) {
processResourceStack((plasmaHTCache.Entry) processStack.removeFirst());
return true;
}
} else {
//if (processStack.size() > 0) {
log.logDebug("DEQUEUE: serverJobs=" + serverJobs + " 'busy' - no dequeueing (processStack=" + processStack.size() + ", localStackSize=" + noticeURL.localStackSize() + ", remoteStackSize=" + noticeURL.remoteStackSize() + ")");
//}
}
return false;
public boolean deQueue() {
// work off fresh entries from the proxy or from the crawler
synchronized (processStack) {
if (processStack.size() == 0) return false; // noting to do
// in case that the server is very busy we do not work off the queue too fast
if (serverJobs > 10) try {Thread.currentThread().sleep(10 * serverJobs);} catch (InterruptedException e) {}
// do one processing step
log.logDebug("DEQUEUE: serverJobs=" + serverJobs +
", processStack=" + processStack.size() +
", localStackSize=" + noticeURL.localStackSize() +
", remoteStackSize=" + noticeURL.remoteStackSize());
processResourceStack((plasmaHTCache.Entry) processStack.removeFirst());
}
return true;
}
public int cleanupJobSize() {
@ -457,16 +462,25 @@ public class plasmaSwitchboard extends serverAbstractSwitch implements serverSwi
}
public boolean localCrawlJob() {
if ((serverJobs < 6) &&
(processStack.size() < crawlSlots) &&
(noticeURL.localStackSize() > 0) &&
(cacheLoader.size() < crawlSlots)) {
// local crawl (may start a global crawl)
plasmaCrawlNURL.entry nex = noticeURL.localPop();
processCrawling(nex, nex.initiator());
return true;
}
return false;
if (noticeURL.localStackSize() == 0) return false;
if (processStack.size() >= crawlSlots) {
log.logDebug("LocalCrawl: too many processes in queue, dismissed (" +
"processStack=" + processStack.size() + ")");
return false;
}
if (cacheLoader.size() >= crawlSlots) {
log.logDebug("LocalCrawl: too many loader in queue, dismissed (" +
"cacheLoader=" + cacheLoader.size() + ")");
return false;
}
// if the server is busy, we do crawling more slowly
if (serverJobs > 3) try {Thread.currentThread().sleep(100 * serverJobs);} catch (InterruptedException e) {}
// do a local crawl (may start a global crawl)
plasmaCrawlNURL.entry nex = noticeURL.localPop();
processCrawling(nex, nex.initiator());
return true;
}
public int globalCrawlJobSize() {
@ -474,16 +488,30 @@ public class plasmaSwitchboard extends serverAbstractSwitch implements serverSwi
}
public boolean globalCrawlJob() {
if ((serverJobs < 2) &&
(processStack.size() == 0) &&
(noticeURL.localStackSize() == 0) &&
(noticeURL.remoteStackSize() > 0)) {
// we don't want to crawl a global URL globally, since WE are the global part. (from this point of view)
plasmaCrawlNURL.entry nex = noticeURL.remotePop();
processCrawling(nex, nex.initiator());
return true;
}
return false;
// work off crawl requests that had been placed by other peers to our crawl stack
// do nothing if either there are private processes to be done
// or there is no global crawl on the stack
if (noticeURL.remoteStackSize() == 0) return false;
if (processStack.size() > 0) {
log.logDebug("GlobalCrawl: any processe is in queue, dismissed (" +
"processStack=" + processStack.size() + ")");
return false;
}
if (noticeURL.localStackSize() > 0) {
log.logDebug("GlobalCrawl: any local crawl is in queue, dismissed (" +
"localStackSize=" + noticeURL.localStackSize() + ")");
return false;
}
// if the server is busy, we do this more slowly
if (serverJobs > 3) try {Thread.currentThread().sleep(100 * serverJobs);} catch (InterruptedException e) {}
// we don't want to crawl a global URL globally, since WE are the global part. (from this point of view)
plasmaCrawlNURL.entry nex = noticeURL.remotePop();
processCrawling(nex, nex.initiator());
return true;
}
private synchronized void processResourceStack(plasmaHTCache.Entry entry) {
@ -742,7 +770,7 @@ public class plasmaSwitchboard extends serverAbstractSwitch implements serverSwi
if (u == null) return plasmaURL.dummyHash; else return u.toString();
}
private synchronized void processCrawling(plasmaCrawlNURL.entry urlEntry, String initiator) {
private void processCrawling(plasmaCrawlNURL.entry urlEntry, String initiator) {
if (urlEntry.url() == null) return;
String profileHandle = urlEntry.profileHandle();
//System.out.println("DEBUG plasmaSwitchboard.processCrawling: profileHandle = " + profileHandle + ", urlEntry.url = " + urlEntry.url());
@ -915,7 +943,7 @@ public class plasmaSwitchboard extends serverAbstractSwitch implements serverSwi
remainingTime = time - (System.currentTimeMillis() - timestamp);
if (remainingTime < 500) remainingTime = 500;
if (remainingTime > 3000) remainingTime = 3000;
plasmaSearch.result acc = searchManager.order(idx, queryhashes, stopwords, order, remainingTime, 100);
plasmaSearch.result acc = searchManager.order(idx, queryhashes, stopwords, order, remainingTime, 10);
log.logDebug("SEARCH TIME AFTER ORDERING OF SEARCH RESULT: " + ((System.currentTimeMillis() - timestamp) / 1000) + " seconds");
// result is a List of urlEntry elements: prepare answer
@ -1028,7 +1056,7 @@ public class plasmaSwitchboard extends serverAbstractSwitch implements serverSwi
long timestamp = System.currentTimeMillis();
plasmaWordIndexEntity idx = searchManager.searchHashes(hashes, duetime * 8 / 10); // a nameless temporary index, not sorted by special order but by hash
long remainingTime = duetime - (System.currentTimeMillis() - timestamp);
plasmaSearch.result acc = searchManager.order(idx, hashes, stopwords, new char[]{plasmaSearch.O_QUALITY, plasmaSearch.O_AGE}, remainingTime, 100);
plasmaSearch.result acc = searchManager.order(idx, hashes, stopwords, new char[]{plasmaSearch.O_QUALITY, plasmaSearch.O_AGE}, remainingTime, 10);
// result is a List of urlEntry elements
if (acc == null) {

@ -73,19 +73,19 @@ public class plasmaWordIndex {
ramCache.setMaxWords(maxWords);
}
public synchronized int addEntry(String wordHash, plasmaWordIndexEntry entry) throws IOException {
public int addEntry(String wordHash, plasmaWordIndexEntry entry) throws IOException {
return ramCache.addEntryToIndexMem(wordHash, entry);
}
public synchronized plasmaWordIndexEntity getEntity(String wordHash, boolean deleteIfEmpty) throws IOException {
public plasmaWordIndexEntity getEntity(String wordHash, boolean deleteIfEmpty) throws IOException {
return ramCache.getIndexMem(wordHash, deleteIfEmpty);
}
public synchronized int sizeMin() {
public int sizeMin() {
return ramCache.sizeMin();
}
public synchronized int removeEntries(String wordHash, String[] urlHashes, boolean deleteComplete) throws IOException {
public int removeEntries(String wordHash, String[] urlHashes, boolean deleteComplete) throws IOException {
return ramCache.removeEntriesMem(wordHash, urlHashes, deleteComplete);
}

@ -78,12 +78,14 @@ public class plasmaWordIndexEntity {
File fp = theLocation.getParentFile();
if (fp != null) fp.mkdirs();
kelondroTree kt;
long cacheSize = theLocation.length();
if (cacheSize > 1048576) cacheSize = 1048576;
if (theLocation.exists()) {
// open existing index file
kt = new kelondroTree(theLocation, 0x400);
kt = new kelondroTree(theLocation, cacheSize);
} else {
// create new index file
kt = new kelondroTree(theLocation, 0x400, plasmaURL.urlHashLength, plasmaWordIndexEntry.attrSpaceShort);
kt = new kelondroTree(theLocation, cacheSize, plasmaURL.urlHashLength, plasmaWordIndexEntry.attrSpaceShort);
}
return kt; // everyone who get this should close it when finished!
}

@ -150,7 +150,7 @@ public class plasmaWordIndexRAMCache extends Thread {
}
}
private synchronized int flushSpecific(boolean greatest) throws IOException {
private int flushSpecific(boolean greatest) throws IOException {
//System.out.println("DEBUG: plasmaIndexRAMCache.flushSpecific(" + ((greatest) ? "greatest" : "smallest") + "); cache.size() = " + cache.size());
if ((hashScore.size() == 0) && (cache.size() == 0)) {
serverLog.logDebug("PLASMA INDEXING", "flushSpecific: called but cache is empty");
@ -205,11 +205,13 @@ public class plasmaWordIndexRAMCache extends Thread {
return pic.getIndex(wordHash, deleteIfEmpty);
}
public synchronized int addEntryToIndexMem(String wordHash, plasmaWordIndexEntry entry) throws IOException {
public int addEntryToIndexMem(String wordHash, plasmaWordIndexEntry entry) throws IOException {
// make space for new words
int flushc = 0;
//serverLog.logDebug("PLASMA INDEXING", "addEntryToIndexMem: cache.size=" + cache.size() + "; hashScore.size=" + hashScore.size());
while (hashScore.size() > maxWords) flushc += flushSpecific(true);
synchronized (hashScore) {
while (hashScore.size() > maxWords) flushc += flushSpecific(true);
}
//if (flushc > 0) serverLog.logDebug("PLASMA INDEXING", "addEntryToIndexMem - flushed " + flushc + " entries");
// put new words into cache

@ -78,8 +78,8 @@ import de.anomic.yacy.*;
public final class yacy {
// static objects
private static final String vString = "@REPL_VERSION@";
private static final String vDATE = "@REPL_DATE@";
private static final String vString = "0.367";
private static final String vDATE = "20050426";
private static final String copyright = "[ YACY Proxy v" + vString + ", build " + vDATE + " by Michael Christen / www.yacy.net ]";
private static final String hline = "-------------------------------------------------------------------------------";

@ -779,7 +779,6 @@ yourself
yourselves
zeigen
zeigt
zeit
zubehoer
zuletzt
zum

Loading…
Cancel
Save