Added new data structure to be used by the balancer (not used yet).

These data structures will enable the balancer to store the crawl queue
into individual queues, one each for a single host.
pull/1/head
orbiter 12 years ago
parent 0e8d752462
commit f90d5296cb

@ -0,0 +1,256 @@
/**
* HostQueue
* Copyright 2013 by Michael Christen
* First released 24.09.2013 at http://yacy.net
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file lgpl21.txt
* If not, see <http://www.gnu.org/licenses/>.
*/
package net.yacy.crawler;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import net.yacy.cora.document.encoding.ASCII;
import net.yacy.cora.order.Base64Order;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.crawler.data.CrawlProfile;
import net.yacy.crawler.retrieval.Request;
import net.yacy.crawler.robots.RobotsTxt;
import net.yacy.kelondro.data.meta.URIMetadataRow;
import net.yacy.kelondro.index.BufferedObjectIndex;
import net.yacy.kelondro.index.Row;
import net.yacy.kelondro.index.RowHandleSet;
import net.yacy.kelondro.table.Table;
import net.yacy.kelondro.util.MemoryControl;
import net.yacy.repository.Blacklist.BlacklistType;
import net.yacy.search.Switchboard;
public class HostQueue {
public static final String indexSuffix = ".stack";
private static final int EcoFSBufferSize = 1000;
private static final int objectIndexBufferSize = 1000;
private static final int MAX_DOUBLE_PUSH_CHECK = 100000;
private final String hostHash;
private final File queuesPath;
private BufferedObjectIndex requestStack;
private HandleSet urlHashDoubleCheck;
public HostQueue(
final File queuesPath,
final String hostHash,
final boolean useTailCache,
final boolean exceed134217727) {
this.hostHash = hostHash;
this.queuesPath = queuesPath;
this.urlHashDoubleCheck = new RowHandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0);
// create a stack for newly entered entries
if (!(this.queuesPath.exists())) this.queuesPath.mkdir(); // make the path
this.queuesPath.mkdirs();
final File f = new File(this.queuesPath, this.hostHash + indexSuffix);
try {
this.requestStack = new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, useTailCache, exceed134217727, true), objectIndexBufferSize);
} catch (final SpaceExceededException e) {
try {
this.requestStack = new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727, true), objectIndexBufferSize);
} catch (final SpaceExceededException e1) {
ConcurrentLog.logException(e1);
}
}
ConcurrentLog.info("Balancer", "opened balancer file with " + this.requestStack.size() + " entries from " + f.toString());
}
public synchronized void close() {
int sizeBeforeClose = this.size();
if (this.urlHashDoubleCheck != null) {
this.urlHashDoubleCheck.clear();
this.urlHashDoubleCheck = null;
}
if (this.requestStack != null) {
this.requestStack.close();
this.requestStack = null;
}
if (sizeBeforeClose == 0) {
// clean up
new File(this.queuesPath, this.hostHash + indexSuffix).delete();
}
}
public void clear() {
try {
this.requestStack.clear();
} catch (final IOException e) {
ConcurrentLog.logException(e);
}
this.urlHashDoubleCheck.clear();
}
public Request get(final byte[] urlhash) throws IOException {
assert urlhash != null;
if (this.requestStack == null) return null; // case occurs during shutdown
final Row.Entry entry = this.requestStack.get(urlhash, false);
if (entry == null) return null;
return new Request(entry);
}
public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException {
// first find a list of url hashes that shall be deleted
final HandleSet urlHashes = new RowHandleSet(this.requestStack.row().primaryKeyLength, Base64Order.enhancedCoder, 100);
final long terminate = timeout == Long.MAX_VALUE ? Long.MAX_VALUE : (timeout > 0) ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
synchronized (this) {
final Iterator<Row.Entry> i = this.requestStack.rows();
Row.Entry rowEntry;
Request crawlEntry;
while (i.hasNext() && (System.currentTimeMillis() < terminate)) {
rowEntry = i.next();
crawlEntry = new Request(rowEntry);
if (crawlEntry.profileHandle().equals(profileHandle)) {
urlHashes.put(crawlEntry.url().hash());
}
}
}
// then delete all these urls from the queues and the file index
return remove(urlHashes);
}
/**
* remove urls from the queue
* @param urlHashes, a list of hashes that shall be removed
* @return number of entries that had been removed
* @throws IOException
*/
public synchronized int remove(final HandleSet urlHashes) throws IOException {
final int s = this.requestStack.size();
int removedCounter = 0;
for (final byte[] urlhash: urlHashes) {
final Row.Entry entry = this.requestStack.remove(urlhash);
if (entry != null) removedCounter++;
// remove from double-check caches
this.urlHashDoubleCheck.remove(urlhash);
}
if (removedCounter == 0) return 0;
assert this.requestStack.size() + removedCounter == s : "urlFileIndex.size() = " + this.requestStack.size() + ", s = " + s;
return removedCounter;
}
public boolean has(final byte[] urlhashb) {
return this.requestStack.has(urlhashb) || this.urlHashDoubleCheck.has(urlhashb);
}
public int size() {
return this.requestStack.size();
}
public boolean isEmpty() {
return this.requestStack.isEmpty();
}
public String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException {
assert entry != null;
final byte[] hash = entry.url().hash();
synchronized (this) {
// double-check
if (this.urlHashDoubleCheck.has(hash)) return "double occurrence in double_push_check";
if (this.requestStack.has(hash)) return "double occurrence in urlFileIndex";
if (this.urlHashDoubleCheck.size() > MAX_DOUBLE_PUSH_CHECK || MemoryControl.shortStatus()) this.urlHashDoubleCheck.clear();
this.urlHashDoubleCheck.put(hash);
// increase dom counter
if (profile != null && profile.domMaxPages() != Integer.MAX_VALUE && profile.domMaxPages() > 0) {
profile.domInc(entry.url().getHost());
}
// add to index
final int s = this.requestStack.size();
this.requestStack.put(entry.toRow());
assert s < this.requestStack.size() : "hash = " + ASCII.String(hash) + ", s = " + s + ", size = " + this.requestStack.size();
assert this.requestStack.has(hash) : "hash = " + ASCII.String(hash);
// add the hash to a queue if the host is unknown to get this fast into the balancer
// now disabled to prevent that a crawl 'freezes' to a specific domain which hosts a lot of pages; the queues are filled anyway
//if (!this.domainStacks.containsKey(entry.url().getHost())) pushHashToDomainStacks(entry.url().getHost(), entry.url().hash());
}
robots.ensureExist(entry.url(), profile.getAgent(), true); // concurrently load all robots.txt
return null;
}
public Request pop() throws IOException {
// returns a crawl entry from the stack and ensures minimum delta times
Request crawlEntry = null;
while (!this.requestStack.isEmpty()) {
synchronized (this) {
Row.Entry rowEntry = this.requestStack.removeOne();
if (rowEntry == null) return null;
crawlEntry = new Request(rowEntry);
// check blacklist (again) because the user may have created blacklist entries after the queue has been filled
if (Switchboard.urlBlacklist.isListed(BlacklistType.CRAWLER, crawlEntry.url())) {
ConcurrentLog.fine("CRAWLER", "URL '" + crawlEntry.url() + "' is in blacklist.");
continue;
}
break;
}
}
if (crawlEntry == null) return null;
return crawlEntry;
}
public Iterator<Request> iterator() throws IOException {
return new EntryIterator();
}
private class EntryIterator implements Iterator<Request> {
private Iterator<Row.Entry> rowIterator;
public EntryIterator() throws IOException {
this.rowIterator = HostQueue.this.requestStack.rows();
}
@Override
public boolean hasNext() {
return (this.rowIterator == null) ? false : this.rowIterator.hasNext();
}
@Override
public Request next() {
final Row.Entry entry = this.rowIterator.next();
try {
return (entry == null) ? null : new Request(entry);
} catch (final IOException e) {
ConcurrentLog.logException(e);
this.rowIterator = null;
return null;
}
}
@Override
public void remove() {
if (this.rowIterator != null) this.rowIterator.remove();
}
}
}

@ -0,0 +1,169 @@
/**
* HostQueues
* Copyright 2013 by Michael Christen
* First released 24.09.2013 at http://yacy.net
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file lgpl21.txt
* If not, see <http://www.gnu.org/licenses/>.
*/
package net.yacy.crawler;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import net.yacy.cora.document.encoding.ASCII;
import net.yacy.cora.order.Base64Order;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.crawler.data.CrawlProfile;
import net.yacy.crawler.retrieval.Request;
import net.yacy.crawler.robots.RobotsTxt;
import net.yacy.kelondro.data.word.Word;
import net.yacy.kelondro.index.RowHandleSet;
/**
* wrapper for single HostQueue queues; this is a collection of such queues.
* All these queues are stored in a common directory for the queue stacks
*/
public class HostQueues {
private final File queuesPath;
private final boolean useTailCache;
private final boolean exceed134217727;
private final Map<String, HostQueue> queues;
public HostQueues(
final File queuesPath,
final boolean useTailCache,
final boolean exceed134217727) {
this.queuesPath = queuesPath;
this.useTailCache = useTailCache;
this.exceed134217727 = exceed134217727;
// create a stack for newly entered entries
if (!(queuesPath.exists())) queuesPath.mkdir(); // make the path
this.queuesPath.mkdirs();
this.queues = new HashMap<String, HostQueue>();
String[] list = this.queuesPath.list();
for (String queuefile: list) {
if (queuefile.endsWith(HostQueue.indexSuffix)) {
String hosthash = queuefile.substring(0, queuefile.length() - HostQueue.indexSuffix.length());
HostQueue queue = new HostQueue(this.queuesPath, hosthash, this.useTailCache, this.exceed134217727);
this.queues.put(hosthash, queue);
}
}
}
public synchronized void close() {
for (HostQueue queue: this.queues.values()) queue.close();
this.queues.clear();
}
public void clear() {
for (HostQueue queue: this.queues.values()) queue.clear();
this.queues.clear();
}
public Request get(final byte[] urlhash) throws IOException {
String hosthash = ASCII.String(urlhash, 6, 6);
HostQueue queue = this.queues.get(hosthash);
if (queue == null) return null;
return queue.get(urlhash);
}
public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException {
int c = 0;
for (HostQueue queue: this.queues.values()) c += queue.removeAllByProfileHandle(profileHandle, timeout);
return c;
}
public synchronized int remove(final HandleSet urlHashes) throws IOException {
Map<String, HandleSet> removeLists = new HashMap<String, HandleSet>();
for (byte[] urlhash: urlHashes) {
String hosthash = ASCII.String(urlhash, 6, 6);
HandleSet removeList = removeLists.get(hosthash);
if (removeList == null) {
removeList = new RowHandleSet(Word.commonHashLength, Base64Order.enhancedCoder, 100);
removeLists.put(hosthash, removeList);
}
try {removeList.put(urlhash);} catch (SpaceExceededException e) {}
}
int c = 0;
for (Map.Entry<String, HandleSet> entry: removeLists.entrySet()) {
HostQueue queue = this.queues.get(entry.getKey());
if (queue != null) c += queue.remove(entry.getValue());
}
return c;
}
public boolean has(final byte[] urlhashb) {
String hosthash = ASCII.String(urlhashb, 6, 6);
HostQueue queue = this.queues.get(hosthash);
if (queue == null) return false;
return queue.has(urlhashb);
}
public int size() {
int c = 0;
for (HostQueue queue: this.queues.values()) c += queue.size();
return c;
}
public boolean isEmpty() {
for (HostQueue queue: this.queues.values()) if (!queue.isEmpty()) return false;
return true;
}
/**
* push a request to one of the host queues. If the queue does not exist, it is created
* @param entry
* @param profile
* @param robots
* @return null if everything is ok or a string with an error message if the push is not allowed according to the crawl profile or robots
* @throws IOException
* @throws SpaceExceededException
*/
public String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException {
String hosthash = ASCII.String(entry.url().hash(), 6, 6);
HostQueue queue = this.queues.get(hosthash);
if (queue == null) {
queue = new HostQueue(this.queuesPath, hosthash, this.useTailCache, this.exceed134217727);
this.queues.put(hosthash, queue);
}
return queue.push(entry, profile, robots);
}
/**
* remove one request from all stacks except from those as listed in notFromHost
* @param notFromHost do not collect from these hosts
* @return a list of requests
* @throws IOException
*/
public List<Request> pop(Set<String> notFromHost) throws IOException {
ArrayList<Request> requests = new ArrayList<Request>();
for (Map.Entry<String, HostQueue> entry: this.queues.entrySet()) {
if (notFromHost.contains(entry.getKey())) continue;
Request r = entry.getValue().pop();
if (r != null) requests.add(r);
}
return requests;
}
}
Loading…
Cancel
Save