You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
yacy_search_server/source/net/yacy/crawler/HostQueue.java

558 lines
22 KiB

/**
* HostQueue
* Copyright 2013 by Michael Christen
* First released 24.09.2013 at http://yacy.net
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file lgpl21.txt
* If not, see <http://www.gnu.org/licenses/>.
*/
package net.yacy.crawler;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Array;
import java.net.MalformedURLException;
import java.util.ArrayList;
added a new way of content browsing in search results: - date navigation The date is taken from the CONTENT of the documents / web pages, NOT from a date submitted in the context of metadata (i.e. http header or html head form). This makes it possible to search for documents in the future, i.e. when documents contain event descriptions for future events. The date is written to an index field which is now enabled by default. All documents are scanned for contained date mentions. To visualize the dates for a specific search results, a histogram showing the number of documents for each day is displayed. To render these histograms the morris.js library is used. Morris.js requires also raphael.js which is now also integrated in YaCy. The histogram is now also displayed in the index browser by default. To select a specific range from a search result, the following modifiers had been introduced: from:<date> to:<date> These modifiers can be used separately (i.e. only 'from' or only 'to') to describe an open interval or combined to have a closed interval. Both dates are inclusive. To select a specific single date only, use the 'to:' - modifier. The histogram shows blue and green lines; the green lines denot weekend days (saturday and sunday). Clicking on bars in the histogram has the following reaction: 1st click: add a from:<date> modifier for the date of the bar 2nd click: add a to:<date> modifier for the date of the bar 3rd click: remove from and date modifier and set a on:<date> for the bar When the on:<date> modifier is used, the histogram shows an unlimited time period. This makes it possible to click again (4th click) which is then interpreted as a 1st click again (sets a from modifier). The display feature is NOT switched on by default; to switch it on use the /ConfigSearchPage_p.html servlet.
10 years ago
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import net.yacy.cora.document.encoding.ASCII;
import net.yacy.cora.document.encoding.UTF8;
import net.yacy.cora.document.id.DigestURL;
import net.yacy.cora.order.Base64Order;
import net.yacy.cora.protocol.ClientIdentification;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.crawler.data.CrawlProfile;
import net.yacy.crawler.data.Latency;
import net.yacy.crawler.retrieval.Request;
import net.yacy.crawler.robots.RobotsTxt;
import net.yacy.kelondro.data.word.Word;
import net.yacy.kelondro.index.BufferedObjectIndex;
import net.yacy.kelondro.index.Index;
import net.yacy.kelondro.index.OnDemandOpenFileIndex;
import net.yacy.kelondro.index.Row;
import net.yacy.kelondro.index.RowHandleSet;
import net.yacy.kelondro.table.Table;
import static net.yacy.kelondro.util.FileUtils.deletedelete;
import net.yacy.kelondro.util.kelondroException;
import net.yacy.repository.Blacklist.BlacklistType;
import net.yacy.search.Switchboard;
public class HostQueue implements Balancer {
private final static ConcurrentLog log = new ConcurrentLog("HostQueue");
public static final String indexSuffix = ".stack";
private static final int EcoFSBufferSize = 1000;
private static final int objectIndexBufferSize = 1000;
private final File hostPath;
private final String hostName;
private String hostHash;
private final int port;
private final boolean exceed134217727;
private final boolean onDemand;
private TreeMap<Integer, Index> depthStacks;
public HostQueue (
final File hostsPath,
final String hostName,
final int port,
final boolean onDemand,
final boolean exceed134217727) {
this.onDemand = onDemand;
this.exceed134217727 = exceed134217727;
this.hostName = (hostName == null) ? "localhost" : hostName; // might be null (file://) but hostqueue needs a name (for queue file)
this.port = port;
this.hostPath = new File(hostsPath, this.hostName + "." + this.port);
init();
}
public HostQueue (
final File hostPath,
final boolean onDemand,
final boolean exceed134217727) {
this.onDemand = onDemand;
this.exceed134217727 = exceed134217727;
this.hostPath = hostPath;
// parse the hostName and port from the file name
String filename = hostPath.getName();
int p = filename.lastIndexOf('.');
if (p < 0) throw new RuntimeException("hostPath name must contain a dot: " + filename);
this.hostName = filename.substring(0, p);
this.port = Integer.parseInt(filename.substring(p + 1)); // consider "host.com" contains dot but no required port -> will throw exception
init();
}
private final void init() {
try {
if (this.hostName == null)
this.hostHash="";
else
this.hostHash = DigestURL.hosthash(this.hostName, this.port);
} catch (MalformedURLException e) {
this.hostHash = "";
}
if (!(this.hostPath.exists())) this.hostPath.mkdirs();
this.depthStacks = new TreeMap<Integer, Index>();
int size = openAllStacks();
if (log.isInfo()) log.info("opened HostQueue " + this.hostPath.getAbsolutePath() + " with " + size + " urls.");
}
public String getHost() {
return this.hostName;
}
public int getPort() {
return this.port;
}
private int openAllStacks() {
String[] l = this.hostPath.list();
int c = 0;
if (l != null) for (String s: l) {
if (s.endsWith(indexSuffix)) try {
int depth = Integer.parseInt(s.substring(0, s.length() - indexSuffix.length()));
File stackFile = new File(this.hostPath, s);
Index depthStack = openStack(stackFile);
if (depthStack != null) {
int sz = depthStack.size();
if (sz == 0) {
depthStack.close();
deletedelete(stackFile);
} else {
this.depthStacks.put(depth, depthStack);
c += sz;
}
}
} catch (NumberFormatException e) {}
}
return c;
}
public synchronized int getLowestStackDepth() {
while (this.depthStacks.size() > 0) {
Map.Entry<Integer, Index> entry;
synchronized (this) {
entry = this.depthStacks.firstEntry();
}
if (entry == null) return 0; // happens only if map is empty
if (entry.getValue().size() == 0) {
entry.getValue().close();
deletedelete(getFile(entry.getKey()));
this.depthStacks.remove(entry.getKey());
continue;
}
return entry.getKey();
}
// this should not happen but it happens if a deletion is done
//assert false;
return 0;
}
private Index getLowestStack() {
while (this.depthStacks.size() > 0) {
Map.Entry<Integer, Index> entry;
synchronized (this) {
entry = this.depthStacks.firstEntry();
}
if (entry == null) return null; // happens only if map is empty
if (entry.getValue().size() == 0) {
entry.getValue().close();
deletedelete(getFile(entry.getKey()));
this.depthStacks.remove(entry.getKey());
continue;
}
return entry.getValue();
}
// this should not happen
//assert false;
return null;
}
private Index getStack(int depth) {
Index depthStack;
synchronized (this) {
depthStack = this.depthStacks.get(depth);
if (depthStack != null) return depthStack;
}
// create a new stack
synchronized (this) {
// check again
depthStack = this.depthStacks.get(depth);
if (depthStack != null) return depthStack;
// now actually create a new stack
final File f = getFile(depth);
depthStack = openStack(f);
if (depthStack != null) this.depthStacks.put(depth, depthStack);
}
return depthStack;
}
private File getFile(int depth) {
String name = Integer.toString(depth);
while (name.length() < 4) name = "0" + name;
final File f = new File(this.hostPath, name + indexSuffix);
return f;
}
private Index openStack(File f) {
for (int i = 0; i < 10; i++) {
// we try that again if it fails because it shall not fail
if (this.onDemand && (!f.exists() || f.length() < 10000)) {
try {
return new BufferedObjectIndex(new OnDemandOpenFileIndex(f, Request.rowdef, exceed134217727), objectIndexBufferSize);
} catch (kelondroException e) {
// possibly the file was closed meanwhile
ConcurrentLog.logException(e);
}
} else {
try {
return new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, false, exceed134217727, true), objectIndexBufferSize);
} catch (final SpaceExceededException e) {
try {
return new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727, true), objectIndexBufferSize);
} catch (final SpaceExceededException e1) {
ConcurrentLog.logException(e1);
}
} catch (kelondroException e) {
// possibly the file was closed meanwhile
ConcurrentLog.logException(e);
}
}
}
return null;
}
@Override
public synchronized void close() {
for (Map.Entry<Integer, Index> entry: this.depthStacks.entrySet()) {
int size = entry.getValue().size();
entry.getValue().close();
if (size == 0) deletedelete(getFile(entry.getKey()));
}
this.depthStacks.clear();
String[] l = this.hostPath.list();
if ((l == null || l.length == 0) && this.hostPath != null) deletedelete(this.hostPath);
}
@Override
public synchronized void clear() {
for (Map.Entry<Integer, Index> entry: this.depthStacks.entrySet()) {
entry.getValue().close();
deletedelete(getFile(entry.getKey()));
}
this.depthStacks.clear();
String[] l = this.hostPath.list();
if (l != null) for (String s: l) {
deletedelete(new File(this.hostPath, s));
}
deletedelete(this.hostPath);
}
@Override
public Request get(final byte[] urlhash) throws IOException {
assert urlhash != null;
if (this.depthStacks == null) return null; // case occurs during shutdown
for (Index depthStack: this.depthStacks.values()) {
final Row.Entry entry = depthStack.get(urlhash, false);
if (entry == null) return null;
return new Request(entry);
}
return null;
}
@Override
public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException {
// first find a list of url hashes that shall be deleted
final long terminate = timeout == Long.MAX_VALUE ? Long.MAX_VALUE : (timeout > 0) ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
int count = 0;
synchronized (this) {
for (Index depthStack: this.depthStacks.values()) {
final HandleSet urlHashes = new RowHandleSet(Word.commonHashLength, Base64Order.enhancedCoder, 100);
final Iterator<Row.Entry> i = depthStack.rows();
Row.Entry rowEntry;
Request crawlEntry;
while (i.hasNext() && (System.currentTimeMillis() < terminate)) {
rowEntry = i.next();
crawlEntry = new Request(rowEntry);
if (crawlEntry.profileHandle().equals(profileHandle)) {
urlHashes.put(crawlEntry.url().hash());
}
if (System.currentTimeMillis() > terminate) break;
}
for (final byte[] urlhash: urlHashes) {
depthStack.remove(urlhash);
count++;
}
}
}
return count;
}
/**
* delete all urls which are stored for given host hashes
* @param hosthashes
* @return number of deleted urls
*/
@Override
public int removeAllByHostHashes(final Set<String> hosthashes) {
for (String h: hosthashes) {
if (this.hostHash.equals(h)) {
int s = this.size();
this.clear();
return s;
}
}
return 0;
}
/**
* remove urls from the queue
* @param urlHashes, a list of hashes that shall be removed
* @return number of entries that had been removed
* @throws IOException
*/
@Override
public synchronized int remove(final HandleSet urlHashes) throws IOException {
int removedCounter = 0;
for (Index depthStack: this.depthStacks.values()) {
final int s = depthStack.size();
for (final byte[] urlhash: urlHashes) {
final Row.Entry entry = depthStack.remove(urlhash);
if (entry != null) removedCounter++;
}
if (removedCounter == 0) return 0;
assert depthStack.size() + removedCounter == s : "urlFileIndex.size() = " + depthStack.size() + ", s = " + s;
}
return removedCounter;
}
@Override
public boolean has(final byte[] urlhashb) {
added a new way of content browsing in search results: - date navigation The date is taken from the CONTENT of the documents / web pages, NOT from a date submitted in the context of metadata (i.e. http header or html head form). This makes it possible to search for documents in the future, i.e. when documents contain event descriptions for future events. The date is written to an index field which is now enabled by default. All documents are scanned for contained date mentions. To visualize the dates for a specific search results, a histogram showing the number of documents for each day is displayed. To render these histograms the morris.js library is used. Morris.js requires also raphael.js which is now also integrated in YaCy. The histogram is now also displayed in the index browser by default. To select a specific range from a search result, the following modifiers had been introduced: from:<date> to:<date> These modifiers can be used separately (i.e. only 'from' or only 'to') to describe an open interval or combined to have a closed interval. Both dates are inclusive. To select a specific single date only, use the 'to:' - modifier. The histogram shows blue and green lines; the green lines denot weekend days (saturday and sunday). Clicking on bars in the histogram has the following reaction: 1st click: add a from:<date> modifier for the date of the bar 2nd click: add a to:<date> modifier for the date of the bar 3rd click: remove from and date modifier and set a on:<date> for the bar When the on:<date> modifier is used, the histogram shows an unlimited time period. This makes it possible to click again (4th click) which is then interpreted as a 1st click again (sets a from modifier). The display feature is NOT switched on by default; to switch it on use the /ConfigSearchPage_p.html servlet.
10 years ago
for (int retry = 0; retry < 3; retry++) {
try {
for (Index depthStack: this.depthStacks.values()) {
if (depthStack.has(urlhashb)) return true;
}
return false;
} catch (ConcurrentModificationException e) {}
}
return false;
}
@Override
public int size() {
int size = 0;
for (Index depthStack: this.depthStacks.values()) {
size += depthStack.size();
}
return size;
}
@Override
public boolean isEmpty() {
for (Index depthStack: this.depthStacks.values()) {
if (!depthStack.isEmpty()) return false;
}
return true;
}
@Override
public String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException {
assert entry != null;
final byte[] hash = entry.url().hash();
synchronized (this) {
// double-check
if (this.has(hash)) return "double occurrence in urlFileIndex";
// increase dom counter
if (profile != null) {
int maxPages = profile.domMaxPages();
if (maxPages != Integer.MAX_VALUE && maxPages > 0) {
String host = entry.url().getHost();
profile.domInc(host);
}
}
// add to index
Index depthStack = getStack(entry.depth());
final int s = depthStack.size();
depthStack.put(entry.toRow());
assert s < depthStack.size() : "hash = " + ASCII.String(hash) + ", s = " + s + ", size = " + depthStack.size();
assert depthStack.has(hash) : "hash = " + ASCII.String(hash);
}
return null;
}
@Override
public Request pop(boolean delay, CrawlSwitchboard cs, RobotsTxt robots) throws IOException {
// returns a crawl entry from the stack and ensures minimum delta times
long sleeptime = 0;
Request crawlEntry = null;
CrawlProfile profileEntry = null;
synchronized (this) {
mainloop: while (true) {
Index depthStack = getLowestStack();
if (depthStack == null) return null;
Row.Entry rowEntry = null;
while (depthStack.size() > 0) {
rowEntry = depthStack.removeOne();
if (rowEntry != null) break;
}
if (rowEntry == null) continue mainloop;
crawlEntry = new Request(rowEntry);
// check blacklist (again) because the user may have created blacklist entries after the queue has been filled
if (Switchboard.urlBlacklist.isListed(BlacklistType.CRAWLER, crawlEntry.url())) {
if (log.isFine()) log.fine("URL '" + crawlEntry.url() + "' is in blacklist.");
continue mainloop;
}
// at this point we must check if the crawlEntry has relevance because the crawl profile still exists
// if not: return null. A calling method must handle the null value and try again
profileEntry = cs.get(UTF8.getBytes(crawlEntry.profileHandle()));
if (profileEntry == null) {
if (log.isFine()) log.fine("no profile entry for handle " + crawlEntry.profileHandle());
continue mainloop;
}
// depending on the caching policy we need sleep time to avoid DoS-like situations
sleeptime = Latency.getDomainSleepTime(robots, profileEntry, crawlEntry.url());
break;
}
}
if (crawlEntry == null) return null;
ClientIdentification.Agent agent = profileEntry == null ? ClientIdentification.yacyInternetCrawlerAgent : profileEntry.getAgent();
long robotsTime = Latency.getRobotsTime(robots, crawlEntry.url(), agent);
Latency.updateAfterSelection(crawlEntry.url(), profileEntry == null ? 0 : robotsTime);
if (delay && sleeptime > 0) {
// force a busy waiting here
// in best case, this should never happen if the balancer works properly
// this is only to protection against the worst case, where the crawler could
// behave in a DoS-manner
if (log.isInfo()) log.info("forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ": " + Latency.waitingRemainingExplain(crawlEntry.url(), robots, agent));
long loops = sleeptime / 1000;
long rest = sleeptime % 1000;
if (loops < 3) {
rest = rest + 1000 * loops;
loops = 0;
}
Thread.currentThread().setName("Balancer waiting for " + crawlEntry.url().getHost() + ": " + sleeptime + " milliseconds");
synchronized(this) {
// must be synchronized here to avoid 'takeover' moves from other threads which then idle the same time which would not be enough
if (rest > 0) {try {this.wait(rest);} catch (final InterruptedException e) {}}
for (int i = 0; i < loops; i++) {
if (log.isInfo()) log.info("waiting for " + crawlEntry.url().getHost() + ": " + (loops - i) + " seconds remaining...");
try {this.wait(1000); } catch (final InterruptedException e) {}
}
}
Latency.updateAfterSelection(crawlEntry.url(), robotsTime);
}
return crawlEntry;
}
@Override
public Iterator<Request> iterator() throws IOException {
final Iterator<Map.Entry<Integer, Index>> depthIterator = this.depthStacks.entrySet().iterator();
@SuppressWarnings("unchecked")
final Iterator<Row.Entry>[] rowIterator = (Iterator<Row.Entry>[]) Array.newInstance(Iterator.class, 1);
rowIterator[0] = null;
return new Iterator<Request>() {
@Override
public boolean hasNext() {
return depthIterator.hasNext() || (rowIterator[0] != null && rowIterator[0].hasNext());
}
@Override
public Request next() {
synchronized (HostQueue.this) {
try {
while (rowIterator[0] == null || !rowIterator[0].hasNext()) {
Map.Entry<Integer, Index> entry = depthIterator.next();
rowIterator[0] = entry.getValue().iterator();
}
if (!rowIterator[0].hasNext()) return null;
Row.Entry rowEntry = rowIterator[0].next();
if (rowEntry == null) return null;
return new Request(rowEntry);
} catch (Throwable e) {
return null;
}
}
}
@Override
public void remove() {
rowIterator[0].remove();
}
};
}
/**
* get a list of domains that are currently maintained as domain stacks
* @return a map of clear text strings of host names to an integer array: {the size of the domain stack, guessed delta waiting time}
*/
@Override
public Map<String, Integer[]> getDomainStackHosts(RobotsTxt robots) {
Map<String, Integer[]> map = new TreeMap<String, Integer[]>();
int delta = Latency.waitingRemainingGuessed(this.hostName, this.hostHash, robots, ClientIdentification.yacyInternetCrawlerAgent);
map.put(this.hostName, new Integer[]{this.size(), delta});
return map;
}
/**
* get lists of crawl request entries for a specific host
* @param host
* @param maxcount
* @param maxtime
* @return a list of crawl loader requests
*/
@Override
public List<Request> getDomainStackReferences(String host, int maxcount, long maxtime) {
if (host == null) return new ArrayList<Request>(0);
if (!this.hostName.equals(host)) return new ArrayList<Request>(0);
final ArrayList<Request> cel = new ArrayList<Request>(maxcount);
long timeout = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime;
Iterator<Request> i;
try {
i = this.iterator();
while (i.hasNext()) {
Request r = i.next();
if (r != null) cel.add(r);
if (System.currentTimeMillis() > timeout || cel.size() >= maxcount) break;
}
} catch (IOException e) {
}
return cel;
}
@Override
public int getOnDemandLimit() {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
@Override
public boolean getExceed134217727() {
return this.exceed134217727;
}
}