You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
542 lines
21 KiB
542 lines
21 KiB
/**
|
|
* HostQueue
|
|
* Copyright 2013 by Michael Christen
|
|
* First released 24.09.2013 at http://yacy.net
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Lesser General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2.1 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Lesser General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Lesser General Public License
|
|
* along with this program in the file lgpl21.txt
|
|
* If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
package net.yacy.crawler;
|
|
|
|
import java.io.File;
|
|
import java.io.IOException;
|
|
import java.lang.reflect.Array;
|
|
import java.net.MalformedURLException;
|
|
import java.util.ArrayList;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.TreeMap;
|
|
|
|
import net.yacy.cora.document.encoding.ASCII;
|
|
import net.yacy.cora.document.encoding.UTF8;
|
|
import net.yacy.cora.document.id.DigestURL;
|
|
import net.yacy.cora.order.Base64Order;
|
|
import net.yacy.cora.protocol.ClientIdentification;
|
|
import net.yacy.cora.storage.HandleSet;
|
|
import net.yacy.cora.util.ConcurrentLog;
|
|
import net.yacy.cora.util.SpaceExceededException;
|
|
import net.yacy.crawler.data.CrawlProfile;
|
|
import net.yacy.crawler.data.Latency;
|
|
import net.yacy.crawler.retrieval.Request;
|
|
import net.yacy.crawler.robots.RobotsTxt;
|
|
import net.yacy.kelondro.data.word.Word;
|
|
import net.yacy.kelondro.index.BufferedObjectIndex;
|
|
import net.yacy.kelondro.index.Index;
|
|
import net.yacy.kelondro.index.OnDemandOpenFileIndex;
|
|
import net.yacy.kelondro.index.Row;
|
|
import net.yacy.kelondro.index.RowHandleSet;
|
|
import net.yacy.kelondro.table.Table;
|
|
import static net.yacy.kelondro.util.FileUtils.deletedelete;
|
|
import net.yacy.kelondro.util.kelondroException;
|
|
import net.yacy.repository.Blacklist.BlacklistType;
|
|
import net.yacy.search.Switchboard;
|
|
|
|
public class HostQueue implements Balancer {
|
|
|
|
private final static ConcurrentLog log = new ConcurrentLog("HostQueue");
|
|
|
|
public static final String indexSuffix = ".stack";
|
|
private static final int EcoFSBufferSize = 1000;
|
|
private static final int objectIndexBufferSize = 1000;
|
|
|
|
private final File hostPath;
|
|
private final String hostName;
|
|
private String hostHash;
|
|
private final int port;
|
|
private final boolean exceed134217727;
|
|
private final boolean onDemand;
|
|
private TreeMap<Integer, Index> depthStacks;
|
|
|
|
public HostQueue (
|
|
final File hostsPath,
|
|
final String hostName,
|
|
final int port,
|
|
final boolean onDemand,
|
|
final boolean exceed134217727) {
|
|
this.onDemand = onDemand;
|
|
this.exceed134217727 = exceed134217727;
|
|
this.hostName = (hostName == null) ? "localhost" : hostName; // might be null (file://) but hostqueue needs a name (for queue file)
|
|
this.port = port;
|
|
this.hostPath = new File(hostsPath, this.hostName + "." + this.port);
|
|
init();
|
|
}
|
|
|
|
public HostQueue (
|
|
final File hostPath,
|
|
final boolean onDemand,
|
|
final boolean exceed134217727) {
|
|
this.onDemand = onDemand;
|
|
this.exceed134217727 = exceed134217727;
|
|
this.hostPath = hostPath;
|
|
// parse the hostName and port from the file name
|
|
String filename = hostPath.getName();
|
|
int p = filename.lastIndexOf('.');
|
|
if (p < 0) throw new RuntimeException("hostPath name must contain a dot: " + filename);
|
|
this.hostName = filename.substring(0, p);
|
|
this.port = Integer.parseInt(filename.substring(p + 1));
|
|
init();
|
|
}
|
|
|
|
private final void init() {
|
|
try {
|
|
if (this.hostName == null)
|
|
this.hostHash="";
|
|
else
|
|
this.hostHash = DigestURL.hosthash(this.hostName, this.port);
|
|
} catch (MalformedURLException e) {
|
|
this.hostHash = "";
|
|
}
|
|
if (!(this.hostPath.exists())) this.hostPath.mkdirs();
|
|
this.depthStacks = new TreeMap<Integer, Index>();
|
|
int size = openAllStacks();
|
|
if (log.isInfo()) log.info("opened HostQueue " + this.hostPath.getAbsolutePath() + " with " + size + " urls.");
|
|
}
|
|
|
|
public String getHost() {
|
|
return this.hostName;
|
|
}
|
|
|
|
public int getPort() {
|
|
return this.port;
|
|
}
|
|
|
|
private int openAllStacks() {
|
|
String[] l = this.hostPath.list();
|
|
int c = 0;
|
|
if (l != null) for (String s: l) {
|
|
if (s.endsWith(indexSuffix)) try {
|
|
int depth = Integer.parseInt(s.substring(0, s.length() - indexSuffix.length()));
|
|
File stackFile = new File(this.hostPath, s);
|
|
Index depthStack = openStack(stackFile);
|
|
if (depthStack != null) {
|
|
int sz = depthStack.size();
|
|
if (sz == 0) {
|
|
depthStack.close();
|
|
deletedelete(stackFile);
|
|
} else {
|
|
this.depthStacks.put(depth, depthStack);
|
|
c += sz;
|
|
}
|
|
}
|
|
} catch (NumberFormatException e) {}
|
|
}
|
|
return c;
|
|
}
|
|
|
|
public synchronized int getLowestStackDepth() {
|
|
while (this.depthStacks.size() > 0) {
|
|
Map.Entry<Integer, Index> entry;
|
|
synchronized (this) {
|
|
entry = this.depthStacks.firstEntry();
|
|
}
|
|
if (entry == null) return 0; // happens only if map is empty
|
|
if (entry.getValue().size() == 0) {
|
|
entry.getValue().close();
|
|
deletedelete(getFile(entry.getKey()));
|
|
this.depthStacks.remove(entry.getKey());
|
|
continue;
|
|
}
|
|
return entry.getKey();
|
|
}
|
|
// this should not happen but it happens if a deletion is done
|
|
//assert false;
|
|
return 0;
|
|
}
|
|
|
|
private Index getLowestStack() {
|
|
while (this.depthStacks.size() > 0) {
|
|
Map.Entry<Integer, Index> entry;
|
|
synchronized (this) {
|
|
entry = this.depthStacks.firstEntry();
|
|
}
|
|
if (entry == null) return null; // happens only if map is empty
|
|
if (entry.getValue().size() == 0) {
|
|
entry.getValue().close();
|
|
deletedelete(getFile(entry.getKey()));
|
|
this.depthStacks.remove(entry.getKey());
|
|
continue;
|
|
}
|
|
return entry.getValue();
|
|
}
|
|
// this should not happen
|
|
//assert false;
|
|
return null;
|
|
}
|
|
|
|
private Index getStack(int depth) {
|
|
Index depthStack;
|
|
synchronized (this) {
|
|
depthStack = this.depthStacks.get(depth);
|
|
if (depthStack != null) return depthStack;
|
|
}
|
|
// create a new stack
|
|
synchronized (this) {
|
|
// check again
|
|
depthStack = this.depthStacks.get(depth);
|
|
if (depthStack != null) return depthStack;
|
|
// now actually create a new stack
|
|
final File f = getFile(depth);
|
|
depthStack = openStack(f);
|
|
if (depthStack != null) this.depthStacks.put(depth, depthStack);
|
|
}
|
|
return depthStack;
|
|
}
|
|
|
|
private File getFile(int depth) {
|
|
String name = Integer.toString(depth);
|
|
while (name.length() < 4) name = "0" + name;
|
|
final File f = new File(this.hostPath, name + indexSuffix);
|
|
return f;
|
|
}
|
|
|
|
private Index openStack(File f) {
|
|
for (int i = 0; i < 10; i++) {
|
|
// we try that again if it fails because it shall not fail
|
|
if (this.onDemand && (!f.exists() || f.length() < 10000)) {
|
|
try {
|
|
return new BufferedObjectIndex(new OnDemandOpenFileIndex(f, Request.rowdef, exceed134217727), objectIndexBufferSize);
|
|
} catch (kelondroException e) {
|
|
// possibly the file was closed meanwhile
|
|
ConcurrentLog.logException(e);
|
|
}
|
|
} else {
|
|
try {
|
|
return new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, false, exceed134217727, true), objectIndexBufferSize);
|
|
} catch (final SpaceExceededException e) {
|
|
try {
|
|
return new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727, true), objectIndexBufferSize);
|
|
} catch (final SpaceExceededException e1) {
|
|
ConcurrentLog.logException(e1);
|
|
}
|
|
} catch (kelondroException e) {
|
|
// possibly the file was closed meanwhile
|
|
ConcurrentLog.logException(e);
|
|
}
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
@Override
|
|
public synchronized void close() {
|
|
for (Map.Entry<Integer, Index> entry: this.depthStacks.entrySet()) {
|
|
int size = entry.getValue().size();
|
|
entry.getValue().close();
|
|
if (size == 0) deletedelete(getFile(entry.getKey()));
|
|
}
|
|
this.depthStacks.clear();
|
|
String[] l = this.hostPath.list();
|
|
if ((l == null || l.length == 0) && this.hostPath != null) deletedelete(this.hostPath);
|
|
}
|
|
|
|
@Override
|
|
public synchronized void clear() {
|
|
for (Map.Entry<Integer, Index> entry: this.depthStacks.entrySet()) {
|
|
entry.getValue().close();
|
|
deletedelete(getFile(entry.getKey()));
|
|
}
|
|
this.depthStacks.clear();
|
|
String[] l = this.hostPath.list();
|
|
if (l != null) for (String s: l) {
|
|
deletedelete(new File(this.hostPath, s));
|
|
}
|
|
deletedelete(this.hostPath);
|
|
}
|
|
|
|
@Override
|
|
public Request get(final byte[] urlhash) throws IOException {
|
|
assert urlhash != null;
|
|
if (this.depthStacks == null) return null; // case occurs during shutdown
|
|
for (Index depthStack: this.depthStacks.values()) {
|
|
final Row.Entry entry = depthStack.get(urlhash, false);
|
|
if (entry == null) return null;
|
|
return new Request(entry);
|
|
}
|
|
return null;
|
|
}
|
|
|
|
@Override
|
|
public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException {
|
|
// first find a list of url hashes that shall be deleted
|
|
final long terminate = timeout == Long.MAX_VALUE ? Long.MAX_VALUE : (timeout > 0) ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
|
|
int count = 0;
|
|
synchronized (this) {
|
|
for (Index depthStack: this.depthStacks.values()) {
|
|
final HandleSet urlHashes = new RowHandleSet(Word.commonHashLength, Base64Order.enhancedCoder, 100);
|
|
final Iterator<Row.Entry> i = depthStack.rows();
|
|
Row.Entry rowEntry;
|
|
Request crawlEntry;
|
|
while (i.hasNext() && (System.currentTimeMillis() < terminate)) {
|
|
rowEntry = i.next();
|
|
crawlEntry = new Request(rowEntry);
|
|
if (crawlEntry.profileHandle().equals(profileHandle)) {
|
|
urlHashes.put(crawlEntry.url().hash());
|
|
}
|
|
if (System.currentTimeMillis() > terminate) break;
|
|
}
|
|
for (final byte[] urlhash: urlHashes) {
|
|
depthStack.remove(urlhash);
|
|
count++;
|
|
}
|
|
}
|
|
}
|
|
return count;
|
|
}
|
|
|
|
/**
|
|
* delete all urls which are stored for given host hashes
|
|
* @param hosthashes
|
|
* @return number of deleted urls
|
|
*/
|
|
@Override
|
|
public int removeAllByHostHashes(final Set<String> hosthashes) {
|
|
for (String h: hosthashes) {
|
|
if (this.hostHash.equals(h)) {
|
|
int s = this.size();
|
|
this.clear();
|
|
return s;
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* remove urls from the queue
|
|
* @param urlHashes, a list of hashes that shall be removed
|
|
* @return number of entries that had been removed
|
|
* @throws IOException
|
|
*/
|
|
@Override
|
|
public synchronized int remove(final HandleSet urlHashes) throws IOException {
|
|
int removedCounter = 0;
|
|
for (Index depthStack: this.depthStacks.values()) {
|
|
final int s = depthStack.size();
|
|
for (final byte[] urlhash: urlHashes) {
|
|
final Row.Entry entry = depthStack.remove(urlhash);
|
|
if (entry != null) removedCounter++;
|
|
}
|
|
if (removedCounter == 0) return 0;
|
|
assert depthStack.size() + removedCounter == s : "urlFileIndex.size() = " + depthStack.size() + ", s = " + s;
|
|
}
|
|
return removedCounter;
|
|
}
|
|
|
|
@Override
|
|
public boolean has(final byte[] urlhashb) {
|
|
for (Index depthStack: this.depthStacks.values()) {
|
|
if (depthStack.has(urlhashb)) return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
@Override
|
|
public int size() {
|
|
int size = 0;
|
|
for (Index depthStack: this.depthStacks.values()) {
|
|
size += depthStack.size();
|
|
}
|
|
return size;
|
|
}
|
|
|
|
@Override
|
|
public boolean isEmpty() {
|
|
for (Index depthStack: this.depthStacks.values()) {
|
|
if (!depthStack.isEmpty()) return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
@Override
|
|
public String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException {
|
|
assert entry != null;
|
|
final byte[] hash = entry.url().hash();
|
|
synchronized (this) {
|
|
// double-check
|
|
if (this.has(hash)) return "double occurrence in urlFileIndex";
|
|
|
|
// increase dom counter
|
|
if (profile != null) {
|
|
int maxPages = profile.domMaxPages();
|
|
if (maxPages != Integer.MAX_VALUE && maxPages > 0) {
|
|
String host = entry.url().getHost();
|
|
profile.domInc(host);
|
|
}
|
|
}
|
|
|
|
// add to index
|
|
Index depthStack = getStack(entry.depth());
|
|
final int s = depthStack.size();
|
|
depthStack.put(entry.toRow());
|
|
assert s < depthStack.size() : "hash = " + ASCII.String(hash) + ", s = " + s + ", size = " + depthStack.size();
|
|
assert depthStack.has(hash) : "hash = " + ASCII.String(hash);
|
|
}
|
|
return null;
|
|
}
|
|
|
|
|
|
@Override
|
|
public Request pop(boolean delay, CrawlSwitchboard cs, RobotsTxt robots) throws IOException {
|
|
// returns a crawl entry from the stack and ensures minimum delta times
|
|
long sleeptime = 0;
|
|
Request crawlEntry = null;
|
|
CrawlProfile profileEntry = null;
|
|
synchronized (this) {
|
|
mainloop: while (true) {
|
|
Index depthStack = getLowestStack();
|
|
if (depthStack == null) return null;
|
|
Row.Entry rowEntry = null;
|
|
while (depthStack.size() > 0) {
|
|
rowEntry = depthStack.removeOne();
|
|
if (rowEntry != null) break;
|
|
}
|
|
if (rowEntry == null) continue mainloop;
|
|
crawlEntry = new Request(rowEntry);
|
|
|
|
// check blacklist (again) because the user may have created blacklist entries after the queue has been filled
|
|
if (Switchboard.urlBlacklist.isListed(BlacklistType.CRAWLER, crawlEntry.url())) {
|
|
if (log.isFine()) log.fine("URL '" + crawlEntry.url() + "' is in blacklist.");
|
|
continue mainloop;
|
|
}
|
|
|
|
// at this point we must check if the crawlEntry has relevance because the crawl profile still exists
|
|
// if not: return null. A calling method must handle the null value and try again
|
|
profileEntry = cs.get(UTF8.getBytes(crawlEntry.profileHandle()));
|
|
if (profileEntry == null) {
|
|
if (log.isFine()) log.fine("no profile entry for handle " + crawlEntry.profileHandle());
|
|
continue mainloop;
|
|
}
|
|
|
|
// depending on the caching policy we need sleep time to avoid DoS-like situations
|
|
sleeptime = Latency.getDomainSleepTime(robots, profileEntry, crawlEntry.url());
|
|
break;
|
|
}
|
|
}
|
|
if (crawlEntry == null) return null;
|
|
ClientIdentification.Agent agent = profileEntry == null ? ClientIdentification.yacyInternetCrawlerAgent : profileEntry.getAgent();
|
|
long robotsTime = Latency.getRobotsTime(robots, crawlEntry.url(), agent);
|
|
Latency.updateAfterSelection(crawlEntry.url(), profileEntry == null ? 0 : robotsTime);
|
|
if (delay && sleeptime > 0) {
|
|
// force a busy waiting here
|
|
// in best case, this should never happen if the balancer works properly
|
|
// this is only to protection against the worst case, where the crawler could
|
|
// behave in a DoS-manner
|
|
if (log.isInfo()) log.info("forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ": " + Latency.waitingRemainingExplain(crawlEntry.url(), robots, agent));
|
|
long loops = sleeptime / 1000;
|
|
long rest = sleeptime % 1000;
|
|
if (loops < 3) {
|
|
rest = rest + 1000 * loops;
|
|
loops = 0;
|
|
}
|
|
Thread.currentThread().setName("Balancer waiting for " + crawlEntry.url().getHost() + ": " + sleeptime + " milliseconds");
|
|
synchronized(this) {
|
|
// must be synchronized here to avoid 'takeover' moves from other threads which then idle the same time which would not be enough
|
|
if (rest > 0) {try {this.wait(rest);} catch (final InterruptedException e) {}}
|
|
for (int i = 0; i < loops; i++) {
|
|
if (log.isInfo()) log.info("waiting for " + crawlEntry.url().getHost() + ": " + (loops - i) + " seconds remaining...");
|
|
try {this.wait(1000); } catch (final InterruptedException e) {}
|
|
}
|
|
}
|
|
Latency.updateAfterSelection(crawlEntry.url(), robotsTime);
|
|
}
|
|
return crawlEntry;
|
|
}
|
|
|
|
@Override
|
|
public Iterator<Request> iterator() throws IOException {
|
|
final Iterator<Map.Entry<Integer, Index>> depthIterator = this.depthStacks.entrySet().iterator();
|
|
@SuppressWarnings("unchecked")
|
|
final Iterator<Row.Entry>[] rowIterator = (Iterator<Row.Entry>[]) Array.newInstance(Iterator.class, 1);
|
|
rowIterator[0] = null;
|
|
return new Iterator<Request>() {
|
|
@Override
|
|
public boolean hasNext() {
|
|
return depthIterator.hasNext() || (rowIterator[0] != null && rowIterator[0].hasNext());
|
|
}
|
|
@Override
|
|
public Request next() {
|
|
synchronized (HostQueue.this) {
|
|
try {
|
|
while (rowIterator[0] == null || !rowIterator[0].hasNext()) {
|
|
Map.Entry<Integer, Index> entry = depthIterator.next();
|
|
rowIterator[0] = entry.getValue().iterator();
|
|
}
|
|
if (!rowIterator[0].hasNext()) return null;
|
|
Row.Entry rowEntry = rowIterator[0].next();
|
|
if (rowEntry == null) return null;
|
|
return new Request(rowEntry);
|
|
} catch (Throwable e) {
|
|
return null;
|
|
}
|
|
}
|
|
}
|
|
@Override
|
|
public void remove() {
|
|
rowIterator[0].remove();
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* get a list of domains that are currently maintained as domain stacks
|
|
* @return a map of clear text strings of host names to an integer array: {the size of the domain stack, guessed delta waiting time}
|
|
*/
|
|
@Override
|
|
public Map<String, Integer[]> getDomainStackHosts(RobotsTxt robots) {
|
|
Map<String, Integer[]> map = new TreeMap<String, Integer[]>();
|
|
int delta = Latency.waitingRemainingGuessed(this.hostName, this.hostHash, robots, ClientIdentification.yacyInternetCrawlerAgent);
|
|
map.put(this.hostName, new Integer[]{this.size(), delta});
|
|
return map;
|
|
}
|
|
|
|
/**
|
|
* get lists of crawl request entries for a specific host
|
|
* @param host
|
|
* @param maxcount
|
|
* @param maxtime
|
|
* @return a list of crawl loader requests
|
|
*/
|
|
@Override
|
|
public List<Request> getDomainStackReferences(String host, int maxcount, long maxtime) {
|
|
if (host == null) return new ArrayList<Request>(0);
|
|
if (!this.hostName.equals(host)) return new ArrayList<Request>(0);
|
|
final ArrayList<Request> cel = new ArrayList<Request>(maxcount);
|
|
long timeout = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime;
|
|
Iterator<Request> i;
|
|
try {
|
|
i = this.iterator();
|
|
while (i.hasNext()) {
|
|
Request r = i.next();
|
|
if (r != null) cel.add(r);
|
|
if (System.currentTimeMillis() > timeout || cel.size() >= maxcount) break;
|
|
}
|
|
} catch (IOException e) {
|
|
}
|
|
return cel;
|
|
}
|
|
|
|
}
|