@ -1,3 +1,51 @@
// plasmaSwitchboard.java
// -----------------------
// part of YaCy
// (C) by Michael Peter Christen; mc@anomic.de
// first published on http://www.anomic.de
// Frankfurt, Germany, 2005
//
// This file was contributed by Martin Thelian
//
// $LastChangedDate: 2005-10-07 15:49:07 +0200 (Fri, 07 Oct 2005) $
// $LastChangedRevision: 874 $
// $LastChangedBy: allo $
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
// Using this software in any meaning (reading, learning, copying, compiling,
// running) means that you agree that the Author(s) is (are) not responsible
// for cost, loss of data or any harm that may be caused directly or indirectly
// by usage of this softare or this documentation. The usage of this software
// is on your own risk. The installation and usage (starting/running) of this
// software may allow other people or application to access your computer and
// any attached devices and is highly dependent on the configuration of the
// software which must be done by the user of the software; the author(s) is
// (are) also not responsible for proper configuration and usage of the
// software, even if provoked by documentation provided together with
// the software.
//
// Any changes to this file according to the GPL as documented in the file
// gpl.txt aside this file in the shipment you received can be done to the
// lines that follows this copyright notice here, but changes must not be
// done inside the copyright notive above. A re-distribution must contain
// the intact and unchanged copyright notice.
// Contributions and changes to the program code must be marked as such.
package de.anomic.plasma ;
import java.io.File ;
@ -6,14 +54,12 @@ import java.net.InetAddress;
import java.net.MalformedURLException ;
import java.net.URL ;
import java.net.UnknownHostException ;
import java.util.ArrayList ;
import java.util.Date ;
import java.util.Iterator ;
import java.util.LinkedList ;
import org.apache.commons.pool.impl.GenericObjectPool ;
import de.anomic.data.robotsParser ;
import de.anomic.http.httpc ;
import de.anomic.kelondro.kelondroTree ;
import de.anomic.kelondro.kelondroRecords.Node ;
import de.anomic.server.serverCodings ;
@ -22,82 +68,56 @@ import de.anomic.server.logging.serverLog;
import de.anomic.tools.bitfield ;
import de.anomic.yacy.yacyCore ;
public final class plasma StackCrawlThread extends Thread {
public final class plasma CrawlStacker {
final WorkerPool theWorkerPool ;
final ThreadGroup theWorkerThreadGroup = new ThreadGroup ( "stackCrawlThreadGroup" ) ;
final serverLog log = new serverLog ( "STACKCRAWL" ) ;
final plasmaSwitchboard sb ;
private final serverLog log = new serverLog ( "STACKCRAWL" ) ;
private final plasmaSwitchboard sb ;
private boolean stopped = false ;
private stackCrawlQueue queue ;
public plasma StackCrawlThread ( plasmaSwitchboard sb , File dbPath , int dbCacheSize ) throws IOException {
public plasma CrawlStacker ( plasmaSwitchboard sb , File dbPath , int dbCacheSize ) throws IOException {
this . sb = sb ;
this . setName ( this . getClass ( ) . getName ( ) ) ;
this . queue = new stackCrawlQueue ( dbPath , dbCacheSize ) ;
this . log . logInfo ( this . queue . size ( ) + " entries in the stackCrawl queue." ) ;
this . log . logInfo ( "STACKCRAWL thread initialized." ) ;
this . theWorkerPool = new WorkerPool ( new WorkterFactory ( this . theWorkerThreadGroup ) ) ;
}
public void stopIt ( ) {
this . stopped = true ;
this . interrupt ( ) ;
try {
this . join ( ) ;
} catch ( InterruptedException e ) {
// TODO Auto-generated catch block
e . printStackTrace ( ) ;
}
}
public int getQueueSize ( ) {
public int size ( ) {
return this . queue . size ( ) ;
}
public void run ( ) {
while ( ( ! this . stopped ) & & ( ! Thread . currentThread ( ) . isInterrupted ( ) ) ) {
public void job ( ) {
try {
// getting a new message from the crawler queue
stackCrawlMessage theMsg = this . queue . waitForMessage ( ) ;
try {
// getting a new message from the crawler queue
stackCrawlMessage theMsg = this . queue . waitForMessage ( ) ;
// getting a free session thread from the pool
Worker worker = ( Worker ) this . theWorkerPool . borrowObject ( ) ;
// processing the new request
worker . execute ( theMsg ) ;
} catch ( InterruptedException e ) {
Thread . interrupted ( ) ;
this . stopped = true ;
// process message
String rejectReason = dequeue ( theMsg ) ;
if ( rejectReason ! = null ) {
this . sb . urlPool . errorURL . newEntry (
new URL ( theMsg . url ( ) ) ,
theMsg . referrerHash ( ) ,
theMsg . initiatorHash ( ) ,
yacyCore . seedDB . mySeed . hash ,
theMsg . name ,
rejectReason ,
new bitfield ( plasmaURL . urlFlagLength ) ,
false
) ;
}
catch ( Exception e ) {
this . log . logSevere ( "plasmaStackCrawlThread.run/loop" , e ) ;
}
} catch ( InterruptedException e ) {
Thread . interrupted ( ) ;
this . stopped = true ;
}
catch ( Exception e ) {
this . log . logSevere ( "plasmaStackCrawlThread.run/loop" , e ) ;
}
try {
this . log . logFine ( "Shutdown. Terminationg worker threads." ) ;
this . theWorkerPool . close ( ) ;
} catch ( Exception e1 ) {
this . log . logSevere ( "Unable to shutdown all remaining stackCrawl threads" , e1 ) ;
}
try {
this . log . logFine ( "Shutdown. Closing stackCrawl queue." ) ;
this . queue . close ( ) ;
} catch ( IOException e ) {
this . log . logSevere ( "DB could not be closed properly." , e ) ;
}
this . log . logInfo ( "Shutdown finished." ) ;
}
public void stackCrawlEnqueue (
public void enqueue (
String nexturlString ,
String referrerString ,
String initiatorHash ,
@ -122,7 +142,7 @@ public final class plasmaStackCrawlThread extends Thread {
}
}
p ublic String stackCrawlD equeue( stackCrawlMessage theMsg ) throws InterruptedException {
p rivate String d equeue( stackCrawlMessage theMsg ) throws InterruptedException {
plasmaCrawlProfile . entry profile = this . sb . profiles . getEntry ( theMsg . profileHandle ( ) ) ;
if ( profile = = null ) {
@ -145,7 +165,6 @@ public final class plasmaStackCrawlThread extends Thread {
// stacks a crawl item. The position can also be remote
// returns null if successful, a reason string if not successful
long startTime = System . currentTimeMillis ( ) ;
String reason = null ; // failure reason
// strange errors
@ -168,8 +187,7 @@ public final class plasmaStackCrawlThread extends Thread {
nexturl = new URL ( nexturlString ) ;
} catch ( MalformedURLException e ) {
reason = "denied_(url_'" + nexturlString + "'_wrong)" ;
this . log . logSevere ( "Wrong URL in stackCrawl: " + nexturlString +
". Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
this . log . logSevere ( "Wrong URL in stackCrawl: " + nexturlString ) ;
return reason ;
}
@ -178,19 +196,16 @@ public final class plasmaStackCrawlThread extends Thread {
InetAddress hostAddress = InetAddress . getByName ( nexturl . getHost ( ) ) ;
if ( hostAddress . isSiteLocalAddress ( ) ) {
reason = "denied_(private_ip_address)" ;
this . log . logFine ( "Host in URL '" + nexturlString + "' has private ip address." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
this . log . logFine ( "Host in URL '" + nexturlString + "' has private ip address." ) ;
return reason ;
} else if ( hostAddress . isLoopbackAddress ( ) ) {
reason = "denied_(loopback_ip_address)" ;
this . log . logFine ( "Host in URL '" + nexturlString + "' has loopback ip address." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
this . log . logFine ( "Host in URL '" + nexturlString + "' has loopback ip address." ) ;
return reason ;
}
} catch ( UnknownHostException e ) {
reason = "denied_(unknown_host)" ;
this . log . logFine ( "Unknown host in URL '" + nexturlString + "'." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
this . log . logFine ( "Unknown host in URL '" + nexturlString + "'." ) ;
return reason ;
}
@ -198,8 +213,7 @@ public final class plasmaStackCrawlThread extends Thread {
String hostlow = nexturl . getHost ( ) . toLowerCase ( ) ;
if ( plasmaSwitchboard . urlBlacklist . isListed ( hostlow , nexturl . getPath ( ) ) ) {
reason = "denied_(url_in_blacklist)" ;
this . log . logFine ( "URL '" + nexturlString + "' is in blacklist." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
this . log . logFine ( "URL '" + nexturlString + "' is in blacklist." ) ;
return reason ;
}
@ -209,8 +223,7 @@ public final class plasmaStackCrawlThread extends Thread {
/ *
urlPool . errorURL . newEntry ( nexturl , referrerHash , initiatorHash , yacyCore . seedDB . mySeed . hash ,
name , reason , new bitfield ( plasmaURL . urlFlagLength ) , false ) ; * /
this . log . logFine ( "URL '" + nexturlString + "' does not match crawling filter '" + profile . generalFilter ( ) + "'." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
this . log . logFine ( "URL '" + nexturlString + "' does not match crawling filter '" + profile . generalFilter ( ) + "'." ) ;
return reason ;
}
@ -220,8 +233,7 @@ public final class plasmaStackCrawlThread extends Thread {
/ *
urlPool . errorURL . newEntry ( nexturl , referrerHash , initiatorHash , yacyCore . seedDB . mySeed . hash ,
name , reason , new bitfield ( plasmaURL . urlFlagLength ) , false ) ; * /
this . log . logFine ( "URL '" + nexturlString + "' is cgi URL." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
this . log . logFine ( "URL '" + nexturlString + "' is cgi URL." ) ;
return reason ;
}
@ -231,8 +243,7 @@ public final class plasmaStackCrawlThread extends Thread {
/ *
urlPool . errorURL . newEntry ( nexturl , referrerHash , initiatorHash , yacyCore . seedDB . mySeed . hash ,
name , reason , new bitfield ( plasmaURL . urlFlagLength ) , false ) ; * /
this . log . logFine ( "URL '" + nexturlString + "' is post URL." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
this . log . logFine ( "URL '" + nexturlString + "' is post URL." ) ;
return reason ;
}
@ -244,8 +255,7 @@ public final class plasmaStackCrawlThread extends Thread {
/ *
urlPool . errorURL . newEntry ( nexturl , referrerHash , initiatorHash , yacyCore . seedDB . mySeed . hash ,
name , reason , new bitfield ( plasmaURL . urlFlagLength ) , false ) ; * /
this . log . logFine ( "URL '" + nexturlString + "' is double registered in '" + dbocc + "'." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
this . log . logFine ( "URL '" + nexturlString + "' is double registered in '" + dbocc + "'." ) ;
return reason ;
}
@ -255,8 +265,7 @@ public final class plasmaStackCrawlThread extends Thread {
/ *
urlPool . errorURL . newEntry ( nexturl , referrerHash , initiatorHash , yacyCore . seedDB . mySeed . hash ,
name , reason , new bitfield ( plasmaURL . urlFlagLength ) , false ) ; * /
this . log . logFine ( "Crawling of URL '" + nexturlString + "' disallowed by robots.txt." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
this . log . logFine ( "Crawling of URL '" + nexturlString + "' disallowed by robots.txt." ) ;
return reason ;
}
@ -271,7 +280,7 @@ public final class plasmaStackCrawlThread extends Thread {
( yacyCore . seedDB . mySeed . isPrincipal ( ) ) ) /* qualified */ ;
if ( ( ! local ) & & ( ! global ) ) {
this . log . log Sever e( "URL '" + nexturlString + "' can neither be crawled local nor global." ) ;
this . log . log Fin e( "URL '" + nexturlString + "' can neither be crawled local nor global." ) ;
}
this . sb . urlPool . noticeURL . newEntry ( initiatorHash , /* initiator, needed for p2p-feedback */
@ -522,7 +531,7 @@ public final class plasmaStackCrawlThread extends Thread {
byte [ ] [ ] entryBytes = null ;
stackCrawlMessage newMessage = null ;
synchronized ( this . urlEntryHashCache ) {
urlHash = ( String ) this . urlEntryHashCache . remove Fir st( ) ;
urlHash = ( String ) this . urlEntryHashCache . remove La st( ) ;
entryBytes = this . urlEntryCache . remove ( urlHash . getBytes ( ) ) ;
}
@ -533,262 +542,4 @@ public final class plasmaStackCrawlThread extends Thread {
}
}
public final class WorkterFactory implements org . apache . commons . pool . PoolableObjectFactory {
final ThreadGroup workerThreadGroup ;
public WorkterFactory ( ThreadGroup theWorkerThreadGroup ) {
super ( ) ;
if ( theWorkerThreadGroup = = null )
throw new IllegalArgumentException ( "The threadgroup object must not be null." ) ;
this . workerThreadGroup = theWorkerThreadGroup ;
}
public Object makeObject ( ) {
Worker newWorker = new Worker ( this . workerThreadGroup ) ;
newWorker . setPriority ( Thread . MAX_PRIORITY ) ;
return newWorker ;
}
/ * *
* @see org . apache . commons . pool . PoolableObjectFactory # destroyObject ( java . lang . Object )
* /
public void destroyObject ( Object obj ) {
if ( obj instanceof Worker ) {
Worker theWorker = ( Worker ) obj ;
theWorker . setStopped ( true ) ;
}
}
/ * *
* @see org . apache . commons . pool . PoolableObjectFactory # validateObject ( java . lang . Object )
* /
public boolean validateObject ( Object obj ) {
if ( obj instanceof Worker )
{
Worker theWorker = ( Worker ) obj ;
if ( ! theWorker . isAlive ( ) | | theWorker . isInterrupted ( ) ) return false ;
if ( theWorker . isRunning ( ) ) return true ;
return false ;
}
return true ;
}
/ * *
* @param obj
*
* /
public void activateObject ( Object obj ) {
//log.debug(" activateObject...");
}
/ * *
* @param obj
*
* /
public void passivateObject ( Object obj ) {
//log.debug(" passivateObject..." + obj);
// if (obj instanceof Session) {
// Session theSession = (Session) obj;
// }
}
}
public final class WorkerPool extends GenericObjectPool {
public boolean isClosed = false ;
/ * *
* First constructor .
* @param objFactory
* /
public WorkerPool ( WorkterFactory objFactory ) {
super ( objFactory ) ;
this . setMaxIdle ( 10 ) ; // Maximum idle threads.
this . setMaxActive ( 50 ) ; // Maximum active threads.
this . setMinEvictableIdleTimeMillis ( 30000 ) ; //Evictor runs every 30 secs.
//this.setMaxWait(1000); // Wait 1 second till a thread is available
}
public WorkerPool ( plasmaStackCrawlThread . WorkterFactory objFactory ,
GenericObjectPool . Config config ) {
super ( objFactory , config ) ;
}
public Object borrowObject ( ) throws Exception {
return super . borrowObject ( ) ;
}
public void returnObject ( Object obj ) throws Exception {
super . returnObject ( obj ) ;
}
public synchronized void close ( ) throws Exception {
/ *
* shutdown all still running session threads . . .
* /
this . isClosed = true ;
/* waiting for all threads to finish */
int threadCount = plasmaStackCrawlThread . this . theWorkerThreadGroup . activeCount ( ) ;
Thread [ ] threadList = new Thread [ threadCount ] ;
threadCount = plasmaStackCrawlThread . this . theWorkerThreadGroup . enumerate ( threadList ) ;
try {
// trying to gracefull stop all still running sessions ...
plasmaStackCrawlThread . this . log . logInfo ( "Signaling shutdown to " + threadCount + " remaining stackCrawl threads ..." ) ;
for ( int currentThreadIdx = 0 ; currentThreadIdx < threadCount ; currentThreadIdx + + ) {
Thread currentThread = threadList [ currentThreadIdx ] ;
if ( currentThread . isAlive ( ) ) {
( ( Worker ) currentThread ) . setStopped ( true ) ;
}
}
// waiting a frew ms for the session objects to continue processing
try { Thread . sleep ( 500 ) ; } catch ( InterruptedException ex ) { }
// interrupting all still running or pooled threads ...
plasmaStackCrawlThread . this . log . logInfo ( "Sending interruption signal to " + plasmaStackCrawlThread . this . theWorkerThreadGroup . activeCount ( ) + " remaining stackCrawl threads ..." ) ;
plasmaStackCrawlThread . this . theWorkerThreadGroup . interrupt ( ) ;
// if there are some sessions that are blocking in IO, we simply close the socket
plasmaStackCrawlThread . this . log . logFine ( "Trying to abort " + plasmaStackCrawlThread . this . theWorkerThreadGroup . activeCount ( ) + " remaining stackCrawl threads ..." ) ;
for ( int currentThreadIdx = 0 ; currentThreadIdx < threadCount ; currentThreadIdx + + ) {
Thread currentThread = threadList [ currentThreadIdx ] ;
if ( currentThread . isAlive ( ) ) {
plasmaStackCrawlThread . this . log . logInfo ( "Trying to shutdown stackCrawl thread '" + currentThread . getName ( ) + "' [" + currentThreadIdx + "]." ) ;
( ( Worker ) currentThread ) . close ( ) ;
}
}
// we need to use a timeout here because of missing interruptable session threads ...
plasmaStackCrawlThread . this . log . logFine ( "Waiting for " + plasmaStackCrawlThread . this . theWorkerThreadGroup . activeCount ( ) + " remaining stackCrawl threads to finish shutdown ..." ) ;
for ( int currentThreadIdx = 0 ; currentThreadIdx < threadCount ; currentThreadIdx + + ) {
Thread currentThread = threadList [ currentThreadIdx ] ;
if ( currentThread . isAlive ( ) ) {
plasmaStackCrawlThread . this . log . logFine ( "Waiting for stackCrawl thread '" + currentThread . getName ( ) + "' [" + currentThreadIdx + "] to finish shutdown." ) ;
try { currentThread . join ( 500 ) ; } catch ( InterruptedException ex ) { }
}
}
plasmaStackCrawlThread . this . log . logInfo ( "Shutdown of remaining stackCrawl threads finish." ) ;
} catch ( Exception e ) {
plasmaStackCrawlThread . this . log . logSevere ( "Unexpected error while trying to shutdown all remaining stackCrawl threads." , e ) ;
}
super . close ( ) ;
}
}
public final class Worker extends Thread {
private boolean running = false ;
private boolean stopped = false ;
private boolean done = false ;
private stackCrawlMessage theMsg ;
public Worker ( ThreadGroup theThreadGroup ) {
super ( theThreadGroup , "stackCrawlThread" ) ;
}
public void setStopped ( boolean stopped ) {
this . stopped = stopped ;
}
public void close ( ) {
if ( this . isAlive ( ) ) {
try {
// trying to close all still open httpc-Sockets first
int closedSockets = httpc . closeOpenSockets ( this ) ;
if ( closedSockets > 0 ) {
plasmaStackCrawlThread . this . log . logInfo ( closedSockets + " HTTP-client sockets of thread '" + this . getName ( ) + "' closed." ) ;
}
} catch ( Exception e ) { }
}
}
public synchronized void execute ( stackCrawlMessage newMsg ) {
this . theMsg = newMsg ;
this . done = false ;
if ( ! this . running ) {
// this.setDaemon(true);
this . start ( ) ;
} else {
this . notifyAll ( ) ;
}
}
public void reset ( ) {
this . done = true ;
this . theMsg = null ;
}
public boolean isRunning ( ) {
return this . running ;
}
public void run ( ) {
this . running = true ;
// The thread keeps running.
while ( ! this . stopped & & ! Thread . interrupted ( ) ) {
if ( this . done ) {
// We are waiting for a task now.
synchronized ( this ) {
try {
this . wait ( ) ; //Wait until we get a request to process.
} catch ( InterruptedException e ) {
this . stopped = true ;
// log.error("", e);
}
}
} else {
//There is a task....let us execute it.
try {
execute ( ) ;
} catch ( Exception e ) {
// log.error("", e);
} finally {
reset ( ) ;
if ( ! this . stopped & & ! this . isInterrupted ( ) & & ! plasmaStackCrawlThread . this . theWorkerPool . isClosed ) {
try {
this . setName ( "stackCrawlThread_inPool" ) ;
plasmaStackCrawlThread . this . theWorkerPool . returnObject ( this ) ;
} catch ( Exception e1 ) {
// e1.printStackTrace();
this . stopped = true ;
}
}
}
}
}
}
private void execute ( ) throws InterruptedException {
try {
String rejectReason = stackCrawlDequeue ( this . theMsg ) ;
if ( rejectReason ! = null ) {
plasmaStackCrawlThread . this . sb . urlPool . errorURL . newEntry (
new URL ( this . theMsg . url ( ) ) ,
this . theMsg . referrerHash ( ) ,
this . theMsg . initiatorHash ( ) ,
yacyCore . seedDB . mySeed . hash ,
this . theMsg . name ,
rejectReason ,
new bitfield ( plasmaURL . urlFlagLength ) ,
false
) ;
}
} catch ( Exception e ) {
e . printStackTrace ( ) ;
} finally {
this . done = true ;
}
}
}
}