- added streaming-support to CrawlURLFetchStack_p servlet

- bug for NPE in list.java
- use more constants

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@3373 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
karlchenofhell 18 years ago
parent 65af9d3215
commit c016fcb10f

@ -39,8 +39,13 @@
// the intact and unchanged copyright notice.
// Contributions and changes to the program code must be marked as such.
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.MalformedURLException;
import java.util.HashMap;
@ -73,74 +78,133 @@ public class CrawlURLFetchStack_p {
return stack;
}
public static final String STREAM_CMD_ADDURLS_ = "ADD URLS: ";
public static final String STREAM_CMD_END = "END";
public static final String STREAM_RESP_OK_ADDURLS_ = "FAILED URLS: ";
public static final String STREAM_RESP_OK = "OK";
public static final String STREAM_RESP_FAILED = "FAILED";
public static serverObjects respond(httpHeader header, serverObjects post, serverSwitch env) {
final serverObjects prop = new serverObjects();
plasmaSwitchboard sb = (plasmaSwitchboard)env;
if (post != null) {
if (post.containsKey("addurls")) {
prop.put("addedUrls", 1);
prop.put("addedUrls_added", addURLs(post, post.getInt("addurls", -1), getURLFetcherStack(env)));
}
else if (post.containsKey("setMaxSize")) {
final int count = post.getInt("maxSize", maxURLsPerFetch);
if (count > 0) {
maxURLsPerFetch = count;
prop.put("set", 1);
prop.put("set_value", maxURLsPerFetch);
} else {
prop.put("set", 2);
prop.put("set_value", count);
}
}
else if (post.containsKey("shiftlcq")) {
final int count = Math.min(post.getInt("shiftloc", 0), sb.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE));
final int failed = shiftFromNotice(sb.noticeURL, plasmaCrawlNURL.STACK_TYPE_CORE, getURLFetcherStack(env), count);
prop.put("shiftloc", 1);
prop.put("shiftloc_value", count - failed);
prop.put("shiftloc_failed", failed);
}
else if (post.containsKey("shiftrcq")) {
final int count = post.getInt("shiftrem", 0);
final int failed = shiftFromNotice(sb.noticeURL, plasmaCrawlNURL.STACK_TYPE_LIMIT, getURLFetcherStack(env), count);
prop.put("shiftrem", 1);
prop.put("shiftrem_value", count - failed);
prop.put("shiftrem_failed", failed);
}
else if (post.containsKey("subupload")) {
if (post.get("upload", "").length() == 0) {
prop.put("uploadError", 1);
} else {
final File file = new File(post.get("upload", ""));
final String content = new String((byte[])post.get("upload$file"));
final String type = post.get("uploadType", "");
if (type.equals("plain")) {
prop.put("upload_added", addURLs(content.split("\n"), getURLFetcherStack(env)));
prop.put("upload_failed", 0);
prop.put("upload", 1);
} else if (type.equals("html")) {
if (((String)header.get(httpHeader.CONNECTION_PROP_PATH)).endsWith(".stream")) {
/* =================================================================
* .stream request
* ================================================================= */
InputStream in = (InputStream)header.get(httpHeader.CONNECTION_PROP_INPUTSTREAM);
OutputStream out = (OutputStream)header.get(httpHeader.CONNECTION_PROP_OUTPUTSTREAM);
BufferedReader inrb = new BufferedReader(new InputStreamReader(in));
PrintWriter outw = new PrintWriter(out);
String line;
int addurls = 0, cururl = 0;
boolean[] status = new boolean[0];
URLFetcherStack stack = getURLFetcherStack(env);
try {
while ((line = inrb.readLine()) != null) {
// commands
if (line.startsWith(STREAM_CMD_ADDURLS_)) {
try {
final htmlFilterContentScraper scraper = new htmlFilterContentScraper(new URL(file));
final Writer writer = new htmlFilterWriter(null, null, scraper, null, false);
serverFileUtils.write(content, writer);
writer.close();
final Iterator it = ((HashMap)scraper.getAnchors()).keySet().iterator();
int added = 0, failed = 0;
String url;
while (it.hasNext()) try {
url = (String)it.next();
getURLFetcherStack(env).push(new URL(url));
added++;
} catch (MalformedURLException e) { failed++; }
addurls = Integer.parseInt(line.substring(STREAM_CMD_ADDURLS_.length()));
status = new boolean[addurls];
cururl = 0;
outw.println(STREAM_RESP_OK);
} catch (NumberFormatException e) {
outw.println(STREAM_RESP_FAILED);
}
} else if (line.equals(STREAM_CMD_END)) {
break;
} else {
if (cururl < addurls) // add url
status[cururl++] = addURL(line, stack);
if (cururl > 0 && cururl == addurls ) {
// done with parsing the passed URL count, now some status output: i.e. 'FAILED URLS: 5 of 8'
outw.print(STREAM_RESP_OK_ADDURLS_);
StringBuffer stat = new StringBuffer();
for (int i=0; i<status.length; i++)
if (!status[i]) stat.append(i).append(", ");
outw.print(stat.substring(0, stat.length() - 2));
outw.print(" of ");
outw.println(status.length);
cururl = 0;
addurls = 0;
}
}
}
} catch (IOException e) { e.printStackTrace(); }
outw.flush();
return null;
} else {
/* =================================================================
* 'normal' request
* ================================================================= */
if (post != null) {
if (post.containsKey("addurls")) {
prop.put("addedUrls", 1);
prop.put("addedUrls_added", addURLs(post, post.getInt("addurls", -1), getURLFetcherStack(env)));
}
else if (post.containsKey("setMaxSize")) {
final int count = post.getInt("maxSize", maxURLsPerFetch);
if (count > 0) {
maxURLsPerFetch = count;
prop.put("set", 1);
prop.put("set_value", maxURLsPerFetch);
} else {
prop.put("set", 2);
prop.put("set_value", count);
}
}
else if (post.containsKey("shiftlcq")) {
final int count = Math.min(post.getInt("shiftloc", 0), sb.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE));
final int failed = shiftFromNotice(sb.noticeURL, plasmaCrawlNURL.STACK_TYPE_CORE, getURLFetcherStack(env), count);
prop.put("shiftloc", 1);
prop.put("shiftloc_value", count - failed);
prop.put("shiftloc_failed", failed);
}
else if (post.containsKey("shiftrcq")) {
final int count = post.getInt("shiftrem", 0);
final int failed = shiftFromNotice(sb.noticeURL, plasmaCrawlNURL.STACK_TYPE_LIMIT, getURLFetcherStack(env), count);
prop.put("shiftrem", 1);
prop.put("shiftrem_value", count - failed);
prop.put("shiftrem_failed", failed);
}
else if (post.containsKey("subupload")) {
if (post.get("upload", "").length() == 0) {
prop.put("uploadError", 1);
} else {
final File file = new File(post.get("upload", ""));
final String content = new String((byte[])post.get("upload$file"));
final String type = post.get("uploadType", "");
if (type.equals("plain")) {
prop.put("upload_added", addURLs(content.split("\n"), getURLFetcherStack(env)));
prop.put("upload_failed", 0);
prop.put("upload", 1);
prop.put("upload_added", added);
prop.put("upload_failed", failed);
} catch (Exception e) {
e.printStackTrace();
prop.put("upload", 2);
prop.put("upload_error", e.getMessage());
} else if (type.equals("html")) {
try {
final htmlFilterContentScraper scraper = new htmlFilterContentScraper(new URL(file));
final Writer writer = new htmlFilterWriter(null, null, scraper, null, false);
serverFileUtils.write(content, writer);
writer.close();
final Iterator it = ((HashMap)scraper.getAnchors()).keySet().iterator();
int added = 0, failed = 0;
String url;
while (it.hasNext()) try {
url = (String)it.next();
getURLFetcherStack(env).push(new URL(url));
added++;
} catch (MalformedURLException e) { failed++; }
prop.put("upload", 1);
prop.put("upload_added", added);
prop.put("upload_failed", failed);
} catch (Exception e) {
e.printStackTrace();
prop.put("upload", 2);
prop.put("upload_error", e.getMessage());
}
}
}
}
@ -156,7 +220,6 @@ public class CrawlURLFetchStack_p {
prop.put("remurls", sb.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_LIMIT));
prop.put("locurlsVal", Math.min(sb.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE), 500));
prop.put("remurlsVal", Math.min(sb.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_LIMIT), 500));
return prop;
}
@ -174,14 +237,19 @@ public class CrawlURLFetchStack_p {
private static int addURLs(String[] urls, URLFetcherStack stack) {
int count = -1;
for (int i=0; i<urls.length; i++) try {
if (urls[i].length() == 0) continue;
stack.push(new URL(urls[i]));
count++;
} catch (MalformedURLException e) { /* ignore this */ }
for (int i=0; i<urls.length; i++)
if (addURL(urls[i], stack)) count++;
return count;
}
private static boolean addURL(String url, URLFetcherStack stack) {
try {
if (url == null || url.length() == 0) return false;
stack.push(new URL(url));
return true;
} catch (MalformedURLException e) { return false; }
}
private static int shiftFromNotice(plasmaCrawlNURL nurl, int fromStackType, URLFetcherStack stack, int count) {
plasmaCrawlNURL.Entry entry;
int failed = 0;
@ -196,7 +264,7 @@ public class CrawlURLFetchStack_p {
int count = 0;
String url;
for (int i=0; i<amount; i++) {
url = post.get("url" + count++, null);
url = post.get("url" + i, null);
if (url == null || url.length() == 0) continue;
try {
stack.push(new URL(url));

@ -87,6 +87,11 @@
<li><span class="error">#[reason]#</span>: <a href="#[url]#">#[url]#</a></li>#{/error}#
</ul>
</dd>
<dt><label for="newDelay">Re-set delay</label>:</dt>
<dd>
<input type="text" name="newDelay" id="newDelay" maxlength="2" size="6" value="#[curDelay]#" style="text-align: right;" /> minutes
<input type="submit" name="resetDelay" value="Set new delay" />
</dd>
<dt>#(status)#
<input type="submit" name="stop" value="Stop Thread" />::
<input type="submit" name="restart" value="Restart Thread" />::

@ -224,6 +224,15 @@ public class CrawlURLFetch_p {
prop.put("threadError", ERR_THREAD_RESUME);
}
}
else if (post.containsKey("resetDelay")) {
final long frequency = getDate(post.get("newDelay", ""), "minutes");
if (frequency == -1) {
prop.put("freqError", ERR_DATE);
} else {
fetcher.delay = frequency;
}
}
prop.put("LOCATION", "/CrawlURLFetch_p.html");
}
if (fetcher != null) {
@ -238,6 +247,7 @@ public class CrawlURLFetch_p {
prop.put("runs_lastFetchedURLs", fetcher.lastFetchedURLs);
prop.put("runs_lastServerResponse", (fetcher.lastServerResponse == null)
? "" : fetcher.lastServerResponse);
prop.put("runs_curDelay", (int)(fetcher.delay / 60000));
Iterator it = fetcher.failed.keySet().iterator();
int i = 0;
@ -265,28 +275,34 @@ public class CrawlURLFetch_p {
private static int listPeers(serverObjects prop, boolean checkURLCount, httpRemoteProxyConfig theRemoteProxyConfig) {
int peerCount = 0;
TreeMap hostList = new TreeMap();
String peername;
if (yacyCore.seedDB != null && yacyCore.seedDB.sizeConnected() > 0) {
prop.put("peersKnown", 1);
try {
TreeMap hostList = new TreeMap();
final Enumeration e = yacyCore.seedDB.seedsConnected(true, false, null, yacyVersion.YACY_PROVIDES_CRAWLS_VIA_LIST_HTML);
while (e.hasMoreElements()) {
yacySeed seed = (yacySeed) e.nextElement();
if (seed != null && (!checkURLCount || getURLs2Fetch(seed, theRemoteProxyConfig) > 0))
hostList.put(seed.get(yacySeed.NAME, "nameless"), seed.hash);
}
String peername;
while ((peername = (String) hostList.firstKey()) != null) {
final String hash = (String) hostList.get(peername);
if (hash.equals(yacyCore.seedDB.mySeed.hash)) continue;
prop.put("peersKnown_peers_" + peerCount + "_hash", hash);
prop.put("peersKnown_peers_" + peerCount + "_name", peername);
hostList.remove(peername);
peerCount++;
final Enumeration e = yacyCore.seedDB.seedsConnected(true, false, null, yacyVersion.YACY_PROVIDES_CRAWLS_VIA_LIST_HTML);
int dbsize;
while (e.hasMoreElements()) {
yacySeed seed = (yacySeed) e.nextElement();
if (seed != null && !seed.hash.equals(yacyCore.seedDB.mySeed.hash)) {
peername = seed.get(yacySeed.NAME, "nameless");
if (checkURLCount && (dbsize = getURLs2Fetch(seed, theRemoteProxyConfig)) > 0) {
hostList.put(peername + " (" + dbsize + ")", seed.hash);
} else {
hostList.put(peername, seed.hash);
}
}
} catch (Exception e) { /* no comment :P */ }
}
}
if (hostList.size() > 0) {
while (!hostList.isEmpty() && (peername = (String) hostList.firstKey()) != null) {
final String hash = (String) hostList.get(peername);
prop.put("peersKnown_peers_" + peerCount + "_hash", hash);
prop.put("peersKnown_peers_" + peerCount + "_name", peername);
hostList.remove(peername);
peerCount++;
}
prop.put("peersKnown_peers", peerCount);
prop.put("peersKnown", 1);
} else {
prop.put("peersKnown", 0);
}
@ -304,7 +320,7 @@ public class CrawlURLFetch_p {
if (answer.matches("\\d+"))
return Integer.parseInt(answer);
else {
System.err.println("RETRIEVED INVALID ANSWER FROM " + seed.getName() + ": '" + answer + "'");
serverLog.logFine("URLFETCHER", "Retrieved invalid answer from " + seed.getName() + ": '" + answer + "'");
return -1;
}
} catch (MalformedURLException e) {
@ -348,7 +364,7 @@ public class CrawlURLFetch_p {
public final URL url;
public final int count;
public final long delay;
public long delay;
public final plasmaSwitchboard sb;
public final plasmaCrawlProfile.entry profile;

@ -51,6 +51,7 @@
// contains contributions by [FB] to support listing URLs for URL Fetcher
import java.io.File;
import java.net.InetAddress;
import de.anomic.data.URLFetcherStack;
import de.anomic.data.listManager;
@ -75,8 +76,14 @@ public final class list {
final String col = post.get("col", "");
final File listsPath = new File(ss.getRootPath(),ss.getConfig("listsPath", "DATA/LISTS"));
final String otherPeerName = yacyCore.seedDB.get(post.get("iam", null)).get(yacySeed.NAME, "unknown");
String otherPeerName = null;
if (post.containsKey("iam")) {
yacySeed bla = yacyCore.seedDB.get(post.get("iam", ""));
if (bla != null) otherPeerName = bla.getName();
}
if (otherPeerName == null) otherPeerName = (String)header.get(httpHeader.CONNECTION_PROP_CLIENTIP);
if (col.equals("black")) {
final StringBuffer out = new StringBuffer();
@ -101,6 +108,7 @@ public final class list {
if (display.equals("list")) {
// list urls from remote crawler queue for other peers
final int count = Math.min(post.getInt("count", 50), CrawlURLFetchStack_p.maxURLsPerFetch);
if (count > 0 && db.size() > 0) {
final StringBuffer sb = new StringBuffer();
@ -112,11 +120,15 @@ public final class list {
cnt++;
}
prop.put("list", sb);
CrawlURLFetchStack_p.fetchMap.put(otherPeerName, new Integer(cnt));
serverLog.logInfo("URLFETCHER", "sent " + cnt + " URLs to peer " + otherPeerName);
CrawlURLFetchStack_p.fetchMap.put(
otherPeerName,
new Integer(((CrawlURLFetchStack_p.fetchMap.get(otherPeerName) == null)
? 0
: ((Integer)CrawlURLFetchStack_p.fetchMap.get(otherPeerName)).intValue()) + cnt));
serverLog.logInfo("URLFETCHER", "sent " + cnt + " URLs to " + otherPeerName);
} else {
prop.put("list", "");
serverLog.logInfo("URLFETCHER", "couldn't satisfy URL request of " + otherPeerName + ": stack is empty");
serverLog.logInfo("URLFETCHER", "couldn't satisfy URL request from " + otherPeerName + ": stack is empty");
}
} else if (display.equals("count")) {
prop.put("list", db.size());

@ -237,6 +237,8 @@ public final class httpHeader extends TreeMap implements Map {
public static final String CONNECTION_PROP_PREV_REQUESTLINE = "PREVREQUESTLINE";
public static final String CONNECTION_PROP_REQUEST_START = "REQUEST_START";
public static final String CONNECTION_PROP_REQUEST_END = "REQUEST_END";
public static final String CONNECTION_PROP_INPUTSTREAM = "INPUTSTREAM";
public static final String CONNECTION_PROP_OUTPUTSTREAM = "OUTPUTSTREAM";
/* PROPERTIES: Client -> Proxy */
public static final String CONNECTION_PROP_CLIENT_REQUEST_HEADER = "CLIENT_REQUEST_HEADER";

@ -491,8 +491,8 @@ public final class httpdFileHandler extends httpdAbstractHandler implements http
// call an image-servlet to produce an on-the-fly - generated image
Object img = null;
try {
requestHeader.put("CLIENTIP", conProp.getProperty("CLIENTIP"));
requestHeader.put("PATH", path);
requestHeader.put(httpHeader.CONNECTION_PROP_CLIENTIP, conProp.getProperty("CLIENTIP"));
requestHeader.put(httpHeader.CONNECTION_PROP_PATH, path);
// in case that there are no args given, args = null or empty hashmap
img = invokeServlet(targetClass, requestHeader, args);
} catch (InvocationTargetException e) {
@ -568,10 +568,10 @@ public final class httpdFileHandler extends httpdAbstractHandler implements http
}
} else if ((targetClass != null) && (path.endsWith(".stream"))) {
// call rewrite-class
requestHeader.put("CLIENTIP", conProp.getProperty("CLIENTIP"));
requestHeader.put("PATH", path);
requestHeader.put("INPUTSTREAM", body);
requestHeader.put("OUTPUTSTREAM", out);
requestHeader.put(httpHeader.CONNECTION_PROP_CLIENTIP, conProp.getProperty("CLIENTIP"));
requestHeader.put(httpHeader.CONNECTION_PROP_PATH, path);
requestHeader.put(httpHeader.CONNECTION_PROP_INPUTSTREAM, body);
requestHeader.put(httpHeader.CONNECTION_PROP_OUTPUTSTREAM, out);
httpd.sendRespondHeader(this.connectionProperties, out, httpVersion, 200, null);
@ -609,8 +609,8 @@ public final class httpdFileHandler extends httpdAbstractHandler implements http
} else {
// CGI-class: call the class to create a property for rewriting
try {
requestHeader.put("CLIENTIP", conProp.getProperty("CLIENTIP"));
requestHeader.put("PATH", path);
requestHeader.put(httpHeader.CONNECTION_PROP_CLIENTIP, conProp.getProperty("CLIENTIP"));
requestHeader.put(httpHeader.CONNECTION_PROP_PATH, path);
// in case that there are no args given, args = null or empty hashmap
Object tmp = invokeServlet(targetClass, requestHeader, args);
if(tmp instanceof servletProperties){
@ -621,7 +621,7 @@ public final class httpdFileHandler extends httpdAbstractHandler implements http
// if no args given , then tp will be an empty Hashtable object (not null)
if (tp == null) tp = new servletProperties();
// check if the servlets requests authentification
if (tp.containsKey("AUTHENTICATE")) {
if (tp.containsKey(servletProperties.ACTION_AUTHENTICATE)) {
// handle brute-force protection
if (authorization != null) {
String clientIP = conProp.getProperty("CLIENTIP", "unknown-host");
@ -634,11 +634,11 @@ public final class httpdFileHandler extends httpdAbstractHandler implements http
}
// send authentication request to browser
httpHeader headers = getDefaultHeaders(path);
headers.put(httpHeader.WWW_AUTHENTICATE,"Basic realm=\"" + tp.get("AUTHENTICATE", "") + "\"");
headers.put(httpHeader.WWW_AUTHENTICATE,"Basic realm=\"" + tp.get(servletProperties.ACTION_AUTHENTICATE, "") + "\"");
httpd.sendRespondHeader(conProp,out,httpVersion,401,headers);
return;
} else if (tp.containsKey("LOCATION")) {
String location = tp.get("LOCATION","");
} else if (tp.containsKey(servletProperties.ACTION_LOCATION)) {
String location = tp.get(servletProperties.ACTION_LOCATION, "");
if (location.length() == 0) location = path;
httpHeader headers = getDefaultHeaders(path);
@ -648,9 +648,9 @@ public final class httpdFileHandler extends httpdAbstractHandler implements http
return;
}
// add the application version, the uptime and the client name to every rewrite table
tp.put("version", switchboard.getConfig("version", ""));
tp.put("uptime", ((System.currentTimeMillis() - Long.parseLong(switchboard.getConfig("startupTime","0"))) / 1000) / 60); // uptime in minutes
tp.put("clientname", switchboard.getConfig("peerName", "anomic"));
tp.put(servletProperties.PEER_STAT_VERSION, switchboard.getConfig("version", ""));
tp.put(servletProperties.PEER_STAT_UPTIME, ((System.currentTimeMillis() - Long.parseLong(switchboard.getConfig("startupTime","0"))) / 1000) / 60); // uptime in minutes
tp.put(servletProperties.PEER_STAT_CLIENTNAME, switchboard.getConfig("peerName", "anomic"));
//System.out.println("respond props: " + ((tp == null) ? "null" : tp.toString())); // debug
} catch (InvocationTargetException e) {
if (e.getCause() instanceof InterruptedException) {

@ -26,6 +26,13 @@ import de.anomic.http.httpHeader;
public class servletProperties extends serverObjects {
private static final long serialVersionUID = 1L;
public static final String ACTION_AUTHENTICATE = "AUTHENTICATE";
public static final String ACTION_LOCATION = "LOCATION";
public static final String PEER_STAT_VERSION = "version";
public static final String PEER_STAT_UPTIME = "uptime";
public static final String PEER_STAT_CLIENTNAME = "clientname";
private String prefix="";

Loading…
Cancel
Save