- fixed a bug in crawl start with file name (npe in new url)

- added deletion of solr index in IndexControlRWIs
- added asynchronous adding of large url lists (happens when crawls are startet with file)
- fixed npe in Image display
- replaced language warning with fine logging
- added a domain name cache in Domains that helps to speed up the isLocal property (less DNS lookups)
- added a new storage class for this new cache: KeyList. The domain key list is stored in DATA/WORK/globalhosts.list
- added concurrent solr updates and chunked transfers (50 documents until a commit is done) for high-speed feeding (> 40000 ppm)
- fixed a bug in content scraper that chopped off large parts of crawl lists (using crawl start from file)

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@7666 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 14 years ago
parent 08108f0ece
commit 3d5104d357

@ -134,9 +134,16 @@ public class Crawler_p {
if (crawlingStart.startsWith("ftp")) crawlingStart = "ftp://" + crawlingStart;
}
// remove crawlingFileContent before we record the call
final String crawlingFileName = post.get("crawlingFile");
final File crawlingFile = (crawlingFileName != null && crawlingFileName.length() > 0) ? new File(crawlingFileName) : null;
if (crawlingFile != null && crawlingFile.exists()) {
post.remove("crawlingFile$file");
}
// normalize URL
DigestURI crawlingStartURL = null;
try {crawlingStartURL = new DigestURI(crawlingStart);} catch (final MalformedURLException e1) {Log.logException(e1);}
if (crawlingFile == null) try {crawlingStartURL = new DigestURI(crawlingStart);} catch (final MalformedURLException e1) {Log.logException(e1);}
crawlingStart = (crawlingStartURL == null) ? null : crawlingStartURL.toNormalform(true, true);
// set new properties
@ -193,13 +200,6 @@ public class Crawler_p {
long crawlingIfOlder = recrawlIfOlderC(crawlingIfOlderCheck, crawlingIfOlderNumber, crawlingIfOlderUnit);
env.setConfig("crawlingIfOlder", crawlingIfOlder);
// remove crawlingFileContent before we record the call
final String crawlingFileName = post.get("crawlingFile");
final File crawlingFile = (crawlingFileName != null && crawlingFileName.length() > 0) ? new File(crawlingFileName) : null;
if (crawlingFile != null && crawlingFile.exists()) {
post.remove("crawlingFile$file");
}
// store this call as api call
if (repeat_time > 0) {
// store as scheduled api call
@ -437,7 +437,7 @@ public class Crawler_p {
cachePolicy);
sb.crawler.putActive(profile.handle().getBytes(), profile);
sb.pauseCrawlJob(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL);
sb.crawlStacker.enqueueEntries(sb.peers.mySeed().hash.getBytes(), profile.handle(), hyperlinks, true);
sb.crawlStacker.enqueueEntriesAsynchronous(sb.peers.mySeed().hash.getBytes(), profile.handle(), hyperlinks, true);
} catch (final PatternSyntaxException e) {
prop.put("info", "4"); // crawlfilter does not match url
prop.putHTML("info_newcrawlingfilter", newcrawlingMustMatch);

@ -41,10 +41,11 @@
<dl>
<dt class="TableCellDark">Index Deletion</dt>
<dd><input type="checkbox" name="deleteIndex" id="deleteIndex"
onclick="x=document.getElementById('deleteIndex').checked;document.getElementById('deleteCache').checked=x;document.getElementById('deleteRobots').checked=x;document.getElementById('deleteCrawlQueues').checked=x;c='disabled';document.getElementById('deleteSearchFl').checked=x;if(x){c='';}document.getElementById('deletecomplete').disabled=c;document.getElementById('deleteCache').disabled=c;document.getElementById('deleteRobots').disabled=c;document.getElementById('deleteCrawlQueues').disabled=c;document.getElementById('deleteSearchFl').disabled=c;"
onclick="x=document.getElementById('deleteIndex').checked;document.getElementById('deleteRobots').checked=x;document.getElementById('deleteCrawlQueues').checked=x;c='disabled';document.getElementById('deleteSearchFl').checked=x;if(x){c='';}document.getElementById('deletecomplete').disabled=c;document.getElementById('deleteCache').disabled=c;document.getElementById('deleteRobots').disabled=c;document.getElementById('deleteCrawlQueues').disabled=c;document.getElementById('deleteSearchFl').disabled=c;"
/><label for="deleteIndex">Delete Search Index</label><br/>
<input type="checkbox" name="deleteCrawlQueues" id="deleteCrawlQueues" disabled="disabled" /><label for="deleteCrawlQueues">Stop Crawler and delete Crawl Queues</label><br/>
#(solr)#::<input type="checkbox" name="deleteSolr" id="deleteSolr" /><label for="deleteSolr">Delete Solr Index</label><br/>#(/solr)#
<input type="checkbox" name="deleteCache" id="deleteCache" disabled="disabled" /><label for="deleteCache">Delete HTTP &amp; FTP Cache</label><br/>
<input type="checkbox" name="deleteCrawlQueues" id="deleteCrawlQueues" disabled="disabled" /><label for="deleteCrawlQueues">Stop Crawler and delete Crawl Queues</label><br/>
<input type="checkbox" name="deleteRobots" id="deleteRobots" disabled="disabled" /><label for="deleteRobots">Delete robots.txt Cache</label><br/>
<input type="checkbox" name="deleteSearchFl" id="deleteSearchFl" disabled="disabled" /><label for="deleteSearchFl">Delete cached snippet-fetching failures during search</label><br/><br/><br/>
<input type="submit" name="deletecomplete" id="deletecomplete" value="Delete" disabled="disabled"/>

@ -84,6 +84,7 @@ public class IndexControlRWIs_p {
prop.put("keyhash", "");
prop.put("result", "");
prop.put("cleanup", post == null ? 1 : 0);
prop.put("cleanup_solr", sb.solrConnector == null ? 0 : 1);
String segmentName = sb.getConfig(SwitchboardConstants.SEGMENT_PUBLIC, "default");
int i = 0;
@ -153,6 +154,9 @@ public class IndexControlRWIs_p {
if (post.get("deleteIndex", "").equals("on")) {
segment.clear();
}
if (post.get("deleteSolr", "").equals("on")) {
sb.solrConnector.clear();
}
if (post.get("deleteCrawlQueues", "").equals("on")) {
sb.crawlQueues.clear();
sb.crawlStacker.clear();

@ -231,6 +231,13 @@ public final class CrawlStacker {
}
}
}
public void enqueueEntriesAsynchronous(final byte[] initiator, final String profileHandle, final Map<MultiProtocolURI, String> hyperlinks, boolean replace) {
new Thread() {
public void run() {
enqueueEntries(initiator, profileHandle, hyperlinks, true);
}
}.start();
}
public void enqueueEntries(byte[] initiator, String profileHandle, Map<MultiProtocolURI, String> hyperlinks, boolean replace) {
for (Map.Entry<MultiProtocolURI, String> e: hyperlinks.entrySet()) {

@ -63,6 +63,7 @@ public class ResultImages {
final Map<MultiProtocolURI, ImageEntry> images = document.getImages();
for (final ImageEntry image: images.values()) {
// do a double-check; attention: this can be time-consuming since this possibly needs a DNS-lookup
if (image == null || image.url() == null) continue;
if (doubleCheck.containsKey(image.url())) continue;
doubleCheck.put(image.url(), System.currentTimeMillis());

@ -300,10 +300,10 @@ public class Segment {
if (!u.contains("/" + language + "/") && !u.contains("/" + ISO639.country(language).toLowerCase() + "/")) {
// no confirmation using the url, use the TLD
language = url.language();
log.logWarning(error + ", corrected using the TLD");
if (log.isFine()) log.logFine(error + ", corrected using the TLD");
} else {
// this is a strong hint that the statistics was in fact correct
log.logWarning(error + ", but the url proves that the statistic is correct");
if (log.isFine()) log.logFine(error + ", but the url proves that the statistic is correct");
}
}
} else {

@ -240,7 +240,7 @@ public final class Switchboard extends serverSwitch {
private final Semaphore shutdownSync = new Semaphore(0);
private boolean terminate = false;
private SolrSingleConnector solrConnector = null;
public SolrSingleConnector solrConnector = null;
//private Object crawlingPausedSync = new Object();
//private boolean crawlingIsPaused = false;
@ -293,6 +293,10 @@ public final class Switchboard extends serverSwitch {
this.dictionariesPath = getDataPath(SwitchboardConstants.DICTIONARY_SOURCE_PATH, SwitchboardConstants.DICTIONARY_SOURCE_PATH_DEFAULT);
this.log.logConfig("Dictionaries Path:" + this.dictionariesPath.toString());
// init global host name cache
this.workPath.mkdirs();
Domains.init(new File(workPath, "globalhosts.list"));
// init sessionid name file
final String sessionidNamesFile = getConfig("sessionidNamesFile","defaults/sessionid.names");
this.log.logConfig("Loading sessionid file " + sessionidNamesFile);
@ -587,7 +591,7 @@ public final class Switchboard extends serverSwitch {
// set up the solr interface
String solrurl = this.getConfig("federated.service.solr.indexing.url", "http://127.0.0.1:8983/solr");
boolean usesolr = this.getConfigBool("federated.service.solr.indexing.enabled", false) & solrurl.length() > 0;
this.solrConnector = (usesolr) ? new SolrSingleConnector("http://127.0.0.1:8983/solr", SolrScheme.SolrCell) : null;
this.solrConnector = (usesolr) ? new SolrSingleConnector(solrurl, SolrScheme.SolrCell) : null;
// initializing dht chunk generation
this.dhtMaxReferenceCount = (int) getConfigLong(SwitchboardConstants.INDEX_DIST_CHUNK_SIZE_START, 50);
@ -1207,6 +1211,8 @@ public final class Switchboard extends serverSwitch {
peers.close();
Cache.close();
tables.close();
Domains.close();
if (solrConnector != null) solrConnector.close();
AccessTracker.dumpLog(new File("DATA/LOG/queries.log"));
UPnP.deletePortMapping();
Tray.removeTray();

@ -24,6 +24,8 @@
package net.yacy.cora.protocol;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
@ -43,6 +45,7 @@ import java.util.regex.Pattern;
import net.yacy.cora.storage.ARC;
import net.yacy.cora.storage.ConcurrentARC;
import net.yacy.cora.storage.KeyList;
import net.yacy.kelondro.util.MemoryControl;
public class Domains {
@ -422,7 +425,23 @@ public class Domains {
insertTLDProps(TLD_Generic, TLD_Generic_ID);
// the id=7 is used to flag local addresses
}
private static KeyList globalHosts;
public static void init(File globalHostsnameCache) {
if (globalHostsnameCache == null) {
globalHosts = null;
} else try {
globalHosts = new KeyList(globalHostsnameCache);
} catch (IOException e) {
globalHosts = null;
}
}
public static void close() {
if (globalHosts != null) try {globalHosts.close();} catch (IOException e) {}
}
/**
* Does an DNS-Check to resolve a hostname to an IP.
*
@ -531,14 +550,20 @@ public class Domains {
return null;
}
if ((ip != null) &&
(!ip.isLoopbackAddress()) &&
(!matchesList(host, nameCacheNoCachingPatterns))
) {
// add new entries
if (ip != null && !ip.isLoopbackAddress() && !matchesList(host, nameCacheNoCachingPatterns)) {
// add new ip cache entries
NAME_CACHE_HIT.put(host, ip);
// add also the isLocal host name caches
boolean localp = ip.isAnyLocalAddress() || ip.isLinkLocalAddress() || ip.isSiteLocalAddress();
if (localp) {
localHostNames.add(host);
} else {
if (globalHosts != null) try {globalHosts.add(host);} catch (IOException e) {}
}
}
LOOKUP_SYNC.remove(host);
return ip;
}
}
@ -772,27 +797,17 @@ public class Domains {
// check if there are other local IP addresses that are not in
// the standard IP range
if (localHostNames.contains(host)) return true;
/*
for (InetAddress a: localHostAddresses) {
String hostname = getHostName(a);
if (hostname != null && hostname.equals(host)) return true;
if (a.getHostAddress().equals(host)) return true;
}
*/
if (globalHosts != null && globalHosts.contains(host)) return false;
// check dns lookup: may be a local address even if the domain name looks global
if (!recursive) return false;
final InetAddress a = dnsResolve(host);
/*
if (a == null) {
// unknown if this is a local address. Could also be a timeout.
// It would be harmful to declare any public address as local, therefore return false
return false;
}
*/
return a == null || a.isAnyLocalAddress() || a.isLinkLocalAddress() || a.isLoopbackAddress() || a.isSiteLocalAddress() || isLocal(a.getHostAddress(), false);
boolean localp = a == null || a.isAnyLocalAddress() || a.isLinkLocalAddress() || a.isLoopbackAddress() || a.isSiteLocalAddress() || isLocal(a.getHostAddress(), false);
return localp;
}
public static void main(final String[] args) {
/*
try {

@ -196,7 +196,6 @@ public class SolrHTTPClient extends SolrServer {
HTTPClient client = new HTTPClient();
if (SolrRequest.METHOD.POST == request.getMethod()) {
boolean isMultipart = ( streams != null && streams.size() > 1 );
if (streams == null || isMultipart) {
String url = _baseURL + path;
@ -228,6 +227,7 @@ public class SolrHTTPClient extends SolrServer {
client.finish();
}
} else {
// It has one stream, this is the post body, put the params in the URL
String pstr = ClientUtils.toQueryString(params, false);
String url = _baseURL + path + pstr;

@ -30,6 +30,8 @@ import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServer;
@ -40,6 +42,7 @@ import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import net.yacy.document.Document;
import net.yacy.kelondro.logging.Log;
public class SolrSingleConnector {
@ -48,14 +51,74 @@ public class SolrSingleConnector {
private SolrServer server;
private SolrScheme scheme;
private final static int transmissionQueueCount = 4; // allow concurrent http sessions to solr
private final static int transmissionQueueSize = 50; // number of documents that are collected until a commit is sent
private Worker[] transmissionWorker; // the transmission workers to solr
private BlockingQueue<SolrInputDocument>[] transmissionQueue; // the queues quere documents are collected
private int transmissionRoundRobinCounter; // a rount robin counter for the transmission queues
@SuppressWarnings("unchecked")
public SolrSingleConnector(String url, SolrScheme scheme) throws IOException {
this.solrurl = url;
this.scheme = scheme;
transmissionRoundRobinCounter = 0;
this.transmissionQueue = new ArrayBlockingQueue[transmissionQueueCount];
for (int i = 0; i < transmissionQueueCount; i++) {
this.transmissionQueue[i] = new ArrayBlockingQueue<SolrInputDocument>(transmissionQueueSize);
}
try {
this.server = new SolrHTTPClient(this.solrurl);
} catch (MalformedURLException e) {
throw new IOException("bad connector url: " + this.solrurl);
}
this.transmissionWorker = new Worker[transmissionQueueCount];
for (int i = 0; i < transmissionQueueCount; i++) {
this.transmissionWorker[i] = new Worker(i);
this.transmissionWorker[i].start();
}
}
private class Worker extends Thread {
boolean shallRun;
int idx;
public Worker(int i) {
this.idx = i;
this.shallRun = true;
}
public void pleaseStop() {
this.shallRun = false;
}
public void run() {
while (this.shallRun) {
if (transmissionQueue[idx].size() > 0) {
try {
flushTransmissionQueue(idx);
} catch (IOException e) {
Log.logSevere("SolrSingleConnector", "flush Transmission failed in worker", e);
continue;
}
} else {
try {Thread.sleep(1000);} catch (InterruptedException e) {}
}
}
try {
flushTransmissionQueue(idx);
} catch (IOException e) {}
}
}
public void close() {
for (int i = 0; i < transmissionQueueCount; i++) {
if (this.transmissionWorker[i].isAlive()) {
this.transmissionWorker[i].pleaseStop();
try {this.transmissionWorker[i].join();} catch (InterruptedException e) {}
}
}
for (int i = 0; i < transmissionQueueCount; i++) {
try {
flushTransmissionQueue(i);
} catch (IOException e) {}
}
}
/**
@ -65,6 +128,7 @@ public class SolrSingleConnector {
public void clear() throws IOException {
try {
server.deleteByQuery("*:*");
server.commit();
} catch (SolrServerException e) {
throw new IOException(e);
}
@ -128,13 +192,19 @@ public class SolrSingleConnector {
}
public void add(String id, Document doc, SolrScheme tempScheme) throws IOException {
addSolr(tempScheme.yacy2solr(id, doc));
}
protected void addSolr(SolrInputDocument doc) throws IOException {
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
docs.add(doc);
addSolr(docs);
SolrInputDocument solrdoc = tempScheme.yacy2solr(id, doc);
int thisrrc = this.transmissionRoundRobinCounter;
int nextrrc = thisrrc++;
if (nextrrc >= transmissionQueueCount) nextrrc = 0;
this.transmissionRoundRobinCounter = nextrrc;
if (this.transmissionWorker[thisrrc].isAlive()) {
this.transmissionQueue[thisrrc].offer(solrdoc);
} else {
if (this.transmissionQueue[thisrrc].size() > 0) flushTransmissionQueue(thisrrc);
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
docs.add(solrdoc);
addSolr(docs);
}
}
protected void addSolr(Collection<SolrInputDocument> docs) throws IOException {
@ -152,6 +222,19 @@ public class SolrSingleConnector {
}
}
private void flushTransmissionQueue(int idx) throws IOException {
Collection<SolrInputDocument> c = new ArrayList<SolrInputDocument>();
while (this.transmissionQueue[idx].size() > 0) {
try {
c.add(this.transmissionQueue[idx].take());
} catch (InterruptedException e) {
continue;
}
}
addSolr(c);
}
/**
* get a query result from solr
* to get all results set the query String to "*:*"

@ -0,0 +1,99 @@
/**
* KeyList
* Copyright 2011 by Michael Peter Christen, mc@yacy.net, Frankfurt a. M., Germany
* First released 18.4.2011 at http://yacy.net
*
* $LastChangedDate: 2011-03-22 10:34:10 +0100 (Di, 22 Mrz 2011) $
* $LastChangedRevision: 7619 $
* $LastChangedBy: orbiter $
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file lgpl21.txt
* If not, see <http://www.gnu.org/licenses/>.
*/
package net.yacy.cora.storage;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.GZIPInputStream;
import net.yacy.cora.document.UTF8;
/**
* a key list is a file which contains a list of key words; each line one word
* The key list is stored into a java set object and the list can be extended on the fly
* which is done by extending the file with just another line.
* When is key list file is initialized, all lines are read and pushed into a java set
*/
public class KeyList {
private static final Object _obj = new Object();
private Map<String, Object> keys;
private RandomAccessFile raf;
public KeyList(File file) throws IOException {
this.keys = new ConcurrentHashMap<String, Object>();
if (file.exists()) {
InputStream is = new FileInputStream(file);
if (file.getName().endsWith(".gz")) {
is = new GZIPInputStream(is);
}
final BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
String l;
try {
while ((l = reader.readLine()) != null) {
if (l.length() == 0 || l.charAt(0) == '#') continue;
l = l.trim().toLowerCase();
this.keys.put(l, _obj);
}
} catch (IOException e) {
// finish
}
}
this.raf = new RandomAccessFile(file, "rw");
}
public boolean contains(String key) {
return this.keys.containsKey(key);
}
public void add(String key) throws IOException {
if (keys.containsKey(key)) return;
synchronized (this.raf) {
if (keys.containsKey(key)) return; // check again for those threads who come late (after another has written this)
this.keys.put(key, _obj);
this.raf.seek(raf.length());
this.raf.write(UTF8.getBytes(key));
this.raf.writeByte('\n');
}
}
public void close() throws IOException {
synchronized (this.raf) {
raf.close();
}
}
}

@ -178,9 +178,8 @@ public class ContentScraper extends AbstractScraper implements Scraper {
}
break location;
}
// find tags inside text
String b = cleanLine(super.stripAll(newtext));
String b = cleanLine(super.stripAllTags(newtext));
if ((insideTag != null) && (!(insideTag.equals("a")))) {
// texts inside tags sometimes have no punctuation at the line end
// this is bad for the text semantics, because it is not possible for the

Loading…
Cancel
Save