From 9e2fc7e5fe4fa8753f02e3856979300a121272b5 Mon Sep 17 00:00:00 2001 From: orbiter Date: Sun, 25 Sep 2005 01:09:21 +0000 Subject: [PATCH] load balancing of crawl target domains git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@791 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- build.properties | 2 +- .../de/anomic/plasma/plasmaCrawlBalancer.java | 175 ++++++++++++++++++ source/de/anomic/plasma/plasmaCrawlNURL.java | 87 ++++++--- .../de/anomic/plasma/plasmaSwitchboard.java | 5 +- source/de/anomic/plasma/plasmaURL.java | 2 +- 5 files changed, 242 insertions(+), 29 deletions(-) create mode 100644 source/de/anomic/plasma/plasmaCrawlBalancer.java diff --git a/build.properties b/build.properties index 7a49e807a..f04d29585 100644 --- a/build.properties +++ b/build.properties @@ -3,7 +3,7 @@ javacSource=1.4 javacTarget=1.4 # Release Configuration -releaseVersion=0.403 +releaseVersion=0.404 releaseFile=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz #releaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz releaseDir=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr} diff --git a/source/de/anomic/plasma/plasmaCrawlBalancer.java b/source/de/anomic/plasma/plasmaCrawlBalancer.java new file mode 100644 index 000000000..7dbbac71d --- /dev/null +++ b/source/de/anomic/plasma/plasmaCrawlBalancer.java @@ -0,0 +1,175 @@ +// plasmaCrawlBalancer.java +// ----------------------- +// part of YaCy +// (C) by Michael Peter Christen; mc@anomic.de +// first published on http://www.anomic.de +// Frankfurt, Germany, 2005 +// created: 24.09.2005 +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// Using this software in any meaning (reading, learning, copying, compiling, +// running) means that you agree that the Author(s) is (are) not responsible +// for cost, loss of data or any harm that may be caused directly or indirectly +// by usage of this softare or this documentation. The usage of this software +// is on your own risk. The installation and usage (starting/running) of this +// software may allow other people or application to access your computer and +// any attached devices and is highly dependent on the configuration of the +// software which must be done by the user of the software; the author(s) is +// (are) also not responsible for proper configuration and usage of the +// software, even if provoked by documentation provided together with +// the software. +// +// Any changes to this file according to the GPL as documented in the file +// gpl.txt aside this file in the shipment you received can be done to the +// lines that follows this copyright notice here, but changes must not be +// done inside the copyright notive above. A re-distribution must contain +// the intact and unchanged copyright notice. +// Contributions and changes to the program code must be marked as such. + +package de.anomic.plasma; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.HashMap; +import java.util.Iterator; +import java.util.ArrayList; + +import de.anomic.kelondro.kelondroRecords; +import de.anomic.kelondro.kelondroStack; + +public class plasmaCrawlBalancer { + + private kelondroStack stack; + private HashMap domainStacks; + + public plasmaCrawlBalancer(File stackFile, long buffersize) throws IOException { + if (stackFile.exists()) + stack = new kelondroStack(stackFile, buffersize); + else + stack = new kelondroStack(stackFile, buffersize, new int[] {plasmaURL.urlHashLength}); + domainStacks = new HashMap(); + } + + public void close() { + try { flushAll(); } catch (IOException e) {} + try { stack.close(); } catch (IOException e) {} + stack = null; + } + + public Iterator iterator() { + // iterates byte[] - objects + return new KeyIterator(stack.iterator()); + } + + public int size() { + return stack.size() + sizeDomainStacks(); + } + + private int sizeDomainStacks() { + int sum = 0; + Iterator i = domainStacks.values().iterator(); + while (i.hasNext()) sum += ((ArrayList) i.next()).size(); + return sum; + } + + private void flushOnce() throws IOException { + // takes one entry from every domain stack and puts it on the file stack + synchronized (domainStacks) { + Iterator i = domainStacks.entrySet().iterator(); + Map.Entry entry; + ArrayList list; + while (i.hasNext()) { + entry = (Map.Entry) i.next(); + list = (ArrayList) entry.getValue(); + stack.push(new byte[][]{(byte[]) list.remove(0)}); + if (list.size() == 0) i.remove(); + } + } + } + + private void flushAll() throws IOException { + while (domainStacks.size() > 0) flushOnce(); + } + + public synchronized void add(String domain, byte[] hash) throws IOException { + //stack.push(new byte[][]{hash}); + ArrayList domainList = (ArrayList) domainStacks.get(domain); + if (domainList == null) { + // create new list + domainList = new ArrayList(); + domainList.add(hash); + domainStacks.put(domain, domainList); + } else { + // extend existent domain list + domainList.add(hash); + } + + // check size of domainStacks and flush + if ((domainStacks.size() > 20) || (sizeDomainStacks() > 400)) { + flushOnce(); + } + } + + public synchronized Object[] /*String, byte[]*/ get() throws IOException { + // returns a pair of domain/hash from the stack + // if the domain is unknown, a null/hash is returned + if (stack.size() > 0) { + return new Object[]{null, stack.pop()[0]}; + } else if (domainStacks.size() > 0) { + flushOnce(); + return new Object[]{null, stack.pop()[0]}; + } else { + return null; + } + } + + public synchronized byte[] top(int dist) throws IOException { + flushAll(); + return stack.top(dist)[0]; + } + + public void clear() throws IOException { + domainStacks.clear(); + stack.clear(); + } + + public class KeyIterator implements Iterator { + + Iterator ni; + + public KeyIterator(Iterator i) { + ni = i; + } + + public boolean hasNext() { + return ni.hasNext(); + } + + public Object next() { + try { + return ((kelondroRecords.Node) ni.next()).getKey(); + } catch (IOException e) { + return null; + } + } + + public void remove() { + } + + } + +} diff --git a/source/de/anomic/plasma/plasmaCrawlNURL.java b/source/de/anomic/plasma/plasmaCrawlNURL.java index 135182a7e..12bbb7451 100644 --- a/source/de/anomic/plasma/plasmaCrawlNURL.java +++ b/source/de/anomic/plasma/plasmaCrawlNURL.java @@ -69,10 +69,10 @@ public class plasmaCrawlNURL extends plasmaURL { public static final int STACK_TYPE_MOVIE = 12; // put on movie stack public static final int STACK_TYPE_MUSIC = 13; // put on music stack - private kelondroStack coreStack; // links found by crawling to depth-1 - private kelondroStack limitStack; // links found by crawling at target depth - private kelondroStack overhangStack; // links found by crawling at depth+1 - private kelondroStack remoteStack; // links from remote crawl orders + private plasmaCrawlBalancer coreStack; // links found by crawling to depth-1 + private plasmaCrawlBalancer limitStack; // links found by crawling at target depth + private plasmaCrawlBalancer overhangStack; // links found by crawling at depth+1 + private plasmaCrawlBalancer remoteStack; // links from remote crawl orders private kelondroStack imageStack; // links pointing to image resources private kelondroStack movieStack; // links pointing to movie resources private kelondroStack musicStack; // links pointing to music resources @@ -116,10 +116,10 @@ public class plasmaCrawlNURL extends plasmaURL { File imageStackFile = new File(cacheStacksPath, "urlNoticeImage0.stack"); File movieStackFile = new File(cacheStacksPath, "urlNoticeMovie0.stack"); File musicStackFile = new File(cacheStacksPath, "urlNoticeMusic0.stack"); - if (coreStackFile.exists()) coreStack = new kelondroStack(coreStackFile, 0); else coreStack = new kelondroStack(coreStackFile, 0, new int[] {plasmaURL.urlHashLength}); - if (limitStackFile.exists()) limitStack = new kelondroStack(limitStackFile, 0); else limitStack = new kelondroStack(limitStackFile, 0, new int[] {plasmaURL.urlHashLength}); - if (overhangStackFile.exists()) overhangStack = new kelondroStack(overhangStackFile, 0); else overhangStack = new kelondroStack(overhangStackFile, 0, new int[] {plasmaURL.urlHashLength}); - if (remoteStackFile.exists()) remoteStack = new kelondroStack(remoteStackFile, 0); else remoteStack = new kelondroStack(remoteStackFile, 0, new int[] {plasmaURL.urlHashLength}); + coreStack = new plasmaCrawlBalancer(coreStackFile, 0); + limitStack = new plasmaCrawlBalancer(limitStackFile, 0); + overhangStack = new plasmaCrawlBalancer(overhangStackFile, 0); + remoteStack = new plasmaCrawlBalancer(remoteStackFile, 0); if (imageStackFile.exists()) imageStack = new kelondroStack(imageStackFile, 0); else imageStack = new kelondroStack(imageStackFile, 0, new int[] {plasmaURL.urlHashLength}); if (movieStackFile.exists()) movieStack = new kelondroStack(movieStackFile, 0); else movieStack = new kelondroStack(movieStackFile, 0, new int[] {plasmaURL.urlHashLength}); if (musicStackFile.exists()) musicStack = new kelondroStack(musicStackFile, 0); else musicStack = new kelondroStack(musicStackFile, 0, new int[] {plasmaURL.urlHashLength}); @@ -128,13 +128,26 @@ public class plasmaCrawlNURL extends plasmaURL { stackIndex = new HashSet(); new initStackIndex().start(); } + + public void close() { + coreStack.close(); + try { + limitStack.close(); + overhangStack.close(); + remoteStack.close(); + imageStack.close(); + movieStack.close(); + musicStack.close(); + } catch (IOException e) {} + try { super.close(); } catch (IOException e) {} + } public class initStackIndex extends Thread { public void run() { Iterator i; try { //System.out.println("init coreStack index"); - i = coreStack.iterator(); while (i.hasNext()) stackIndex.add(new String(((kelondroRecords.Node) i.next()).getKey())); + i = coreStack.iterator(); while (i.hasNext()) stackIndex.add(new String((byte[]) i.next())); //System.out.println("init limitStack index"); i = limitStack.iterator(); while (i.hasNext()) stackIndex.add(new String(((kelondroRecords.Node) i.next()).getKey())); //System.out.println("init overhangStack index"); @@ -192,17 +205,17 @@ public class plasmaCrawlNURL extends plasmaURL { int depth, int anchors, int forkfactor, int stackMode) { Entry e = new Entry(initiator, url, referrer, name, loaddate, profile, depth, anchors, forkfactor); - push(stackMode, e.hash); + push(stackMode, url.getHost(), e.hash); return e; } - private void push(int stackType, String hash) { + private void push(int stackType, String domain, String hash) { try { switch (stackType) { - case STACK_TYPE_CORE: coreStack.push(new byte[][] {hash.getBytes()}); break; - case STACK_TYPE_LIMIT: limitStack.push(new byte[][] {hash.getBytes()}); break; - case STACK_TYPE_OVERHANG: overhangStack.push(new byte[][] {hash.getBytes()}); break; - case STACK_TYPE_REMOTE: remoteStack.push(new byte[][] {hash.getBytes()}); break; + case STACK_TYPE_CORE: coreStack.add(domain, hash.getBytes()); break; + case STACK_TYPE_LIMIT: limitStack.add(domain, hash.getBytes()); break; + case STACK_TYPE_OVERHANG: overhangStack.add(domain, hash.getBytes()); break; + case STACK_TYPE_REMOTE: remoteStack.add(domain, hash.getBytes()); break; case STACK_TYPE_IMAGE: imageStack.push(new byte[][] {hash.getBytes()}); break; case STACK_TYPE_MOVIE: movieStack.push(new byte[][] {hash.getBytes()}); break; case STACK_TYPE_MUSIC: musicStack.push(new byte[][] {hash.getBytes()}); break; @@ -239,16 +252,9 @@ public class plasmaCrawlNURL extends plasmaURL { } public void shift(int fromStack, int toStack) throws IOException { - switch (fromStack) { - case STACK_TYPE_CORE: push(toStack, new String(coreStack.pop()[0])); return; - case STACK_TYPE_LIMIT: push(toStack, new String(limitStack.pop()[0])); return; - case STACK_TYPE_OVERHANG: push(toStack, new String(overhangStack.pop()[0])); return; - case STACK_TYPE_REMOTE: push(toStack, new String(remoteStack.pop()[0])); return; - case STACK_TYPE_IMAGE: push(toStack, new String(imageStack.pop()[0])); return; - case STACK_TYPE_MOVIE: push(toStack, new String(movieStack.pop()[0])); return; - case STACK_TYPE_MUSIC: push(toStack, new String(musicStack.pop()[0])); return; - default: return; - } + Entry entry = pop(fromStack); + if (entry.url() == null) return; + push(toStack, entry.url.getHost(), entry.hash()); } public void clear(int stackType) { @@ -281,6 +287,21 @@ public class plasmaCrawlNURL extends plasmaURL { } } + private Entry pop(plasmaCrawlBalancer balancer) { + // this is a filo - pop + try { + if (balancer.size() > 0) { + Entry e = new Entry(new String((byte[]) balancer.get()[1])); + stackIndex.remove(e.hash); + return e; + } else { + return null; + } + } catch (IOException e) { + return null; + } + } + private Entry[] top(kelondroStack stack, int count) { // this is a filo - top if (count > stack.size()) count = stack.size(); @@ -297,6 +318,22 @@ public class plasmaCrawlNURL extends plasmaURL { } } + private Entry[] top(plasmaCrawlBalancer balancer, int count) { + // this is a filo - top + if (count > balancer.size()) count = balancer.size(); + ArrayList list = new ArrayList(count); + try { + for (int i = 0; i < count; i++) { + byte[] hash = balancer.top(i); + if (hash == null) continue; + list.add(new Entry(new String(hash))); + } + return (Entry[])list.toArray(new Entry[list.size()]); + } catch (IOException e) { + return null; + } + } + public synchronized Entry getEntry(String hash) { return new Entry(hash); } diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 36fbdf360..569b0f590 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -814,11 +814,12 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser return false; } - if ((coreCrawlJobSize() == 0) && (limitCrawlTriggerJobSize() > 100)) { + if ((coreCrawlJobSize() <= 20) && (limitCrawlTriggerJobSize() > 100)) { // it is not efficient if the core crawl job is empty and we have too much to do // move some tasks to the core crawl job - int toshift = limitCrawlTriggerJobSize() / 10; + int toshift = limitCrawlTriggerJobSize() / 5; if (toshift > 1000) toshift = 1000; + if (toshift > limitCrawlTriggerJobSize()) toshift = limitCrawlTriggerJobSize(); try { for (int i = 0; i < toshift; i++) { urlPool.noticeURL.shift(plasmaCrawlNURL.STACK_TYPE_LIMIT, plasmaCrawlNURL.STACK_TYPE_CORE); diff --git a/source/de/anomic/plasma/plasmaURL.java b/source/de/anomic/plasma/plasmaURL.java index 39cb4c1d6..43e4ea0f3 100644 --- a/source/de/anomic/plasma/plasmaURL.java +++ b/source/de/anomic/plasma/plasmaURL.java @@ -106,7 +106,7 @@ public class plasmaURL { } public void close() throws IOException { - urlHashCache.close(); + if (urlHashCache != null) urlHashCache.close(); } public boolean exists(String urlHash) {