fixed deadlock

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4562 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 17 years ago
parent c565906050
commit 9c989fe5f7

@ -395,12 +395,14 @@ public final class httpc {
// trying to establish a connection to the address // trying to establish a connection to the address
this.initTime = System.currentTimeMillis(); this.initTime = System.currentTimeMillis();
this.lastIO = System.currentTimeMillis(); this.lastIO = System.currentTimeMillis();
this.socket.setKeepAlive(false); this.socket.setKeepAlive(true);
// set socket timeout and keep alive behavior // set socket timeout and keep alive behavior
assert timeout >= 1000; assert timeout >= 1000;
this.socket.setSoTimeout(timeout); // waiting time for read this.socket.setSoTimeout(timeout); // waiting time for read
this.socket.setTcpNoDelay(true); // no accumulation until buffer is full this.socket.setTcpNoDelay(true); // no accumulation until buffer is full
this.socket.setSoLinger(true, timeout); // wait for all data being written on close() this.socket.setSoLinger(true, timeout); // wait for all data being written on close()
this.socket.setSendBufferSize(1440); // read http://www.cisco.com/warp/public/105/38.shtml
this.socket.setReceiveBufferSize(1440); // read http://www.cisco.com/warp/public/105/38.shtml
// get the connection // get the connection
this.socket.connect(address, timeout); this.socket.connect(address, timeout);
@ -1411,8 +1413,9 @@ public final class httpc {
* *
* @param procOS * @param procOS
* @param file * @param file
* @throws IOException
*/ */
public void writeContent(Object procOS, File file) { public void writeContent(Object procOS, File file) throws IOException {
// this writes the input stream to either another output stream or // this writes the input stream to either another output stream or
// a file or both. // a file or both.
FileOutputStream bufferOS = null; FileOutputStream bufferOS = null;
@ -1421,22 +1424,18 @@ public final class httpc {
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
file = null; file = null;
} }
try { InputStream is = this.getContentInputStream();
InputStream is = this.getContentInputStream(); if (procOS == null) {
if (procOS == null) { writeX(is, null, bufferOS);
writeX(is, null, bufferOS); } else if (procOS instanceof OutputStream) {
} else if (procOS instanceof OutputStream) { writeX(is, (OutputStream) procOS, bufferOS);
writeX(is, (OutputStream) procOS, bufferOS); //writeContentX(httpc.this.clientInput, this.gzip, this.responseHeader.contentLength(), procOS, bufferOS);
//writeContentX(httpc.this.clientInput, this.gzip, this.responseHeader.contentLength(), procOS, bufferOS); } else if (procOS instanceof Writer) {
} else if (procOS instanceof Writer) { String charSet = this.responseHeader.getCharacterEncoding();
String charSet = this.responseHeader.getCharacterEncoding(); if (charSet == null) charSet = httpHeader.DEFAULT_CHARSET;
if (charSet == null) charSet = httpHeader.DEFAULT_CHARSET; writeX(is, charSet, (Writer) procOS, bufferOS, charSet);
writeX(is, charSet, (Writer) procOS, bufferOS, charSet); } else {
} else { throw new IllegalArgumentException("Invalid procOS object type '" + procOS.getClass().getName() + "'");
throw new IllegalArgumentException("Invalid procOS object type '" + procOS.getClass().getName() + "'");
}
} catch (IOException e) {
e.printStackTrace();
} }
if (bufferOS != null) { if (bufferOS != null) {
@ -1451,12 +1450,12 @@ public final class httpc {
} }
public void writeX(InputStream source, OutputStream procOS, OutputStream bufferOS) { public void writeX(InputStream source, OutputStream procOS, OutputStream bufferOS) throws IOException {
byte[] buffer = new byte[2048]; byte[] buffer = new byte[2048];
int l, c = 0; int l, c = 0;
lastIO = System.currentTimeMillis(); lastIO = System.currentTimeMillis();
io: while (true) try { io: while (true) {
l = source.read(buffer, 0, buffer.length); l = source.read(buffer, 0, buffer.length);
if (l < 0) break; if (l < 0) break;
if (l == 0) try { if (l == 0) try {
@ -1470,10 +1469,6 @@ public final class httpc {
c += l; c += l;
if (procOS != null) procOS.write(buffer, 0, l); if (procOS != null) procOS.write(buffer, 0, l);
if (bufferOS != null) bufferOS.write(buffer, 0, l); if (bufferOS != null) bufferOS.write(buffer, 0, l);
} catch (IOException e) {
System.out.println("*** DEBUG: writeX/IOStream terminated with IOException, processed " + c + " bytes. cause: " + e.getMessage());
e.printStackTrace();
break;
} }
// flush the streams // flush the streams

@ -96,7 +96,6 @@ public class httpdByteCountInputStream extends FilterInputStream {
if (readCount > 0) this.byteCount += readCount; if (readCount > 0) this.byteCount += readCount;
return readCount; return readCount;
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace();
throw new IOException(e.getMessage() + "; b.length = " + b.length + ", off = " + off + ", len = " + len); throw new IOException(e.getMessage() + "; b.length = " + b.length + ", off = " + off + ", len = " + len);
} }
} }

@ -118,6 +118,7 @@ public class plasmaCrawlBalancer {
public synchronized void close() { public synchronized void close() {
while (domainStacksNotEmpty()) flushOnceDomStacks(0, true); // flush to ram, because the ram flush is optimized while (domainStacksNotEmpty()) flushOnceDomStacks(0, true); // flush to ram, because the ram flush is optimized
size();
try { flushAllRamStack(); } catch (IOException e) {} try { flushAllRamStack(); } catch (IOException e) {}
if (urlFileIndex != null) { if (urlFileIndex != null) {
urlFileIndex.close(); urlFileIndex.close();

@ -62,9 +62,9 @@ public class plasmaCrawlNURL {
public static final int STACK_TYPE_MOVIE = 12; // put on movie stack public static final int STACK_TYPE_MOVIE = 12; // put on movie stack
public static final int STACK_TYPE_MUSIC = 13; // put on music stack public static final int STACK_TYPE_MUSIC = 13; // put on music stack
private static final long minimumLocalDelta = 50; // the minimum time difference between access of the same local domain private static final long minimumLocalDelta = 10; // the minimum time difference between access of the same local domain
private static final long minimumGlobalDelta = 500; // the minimum time difference between access of the same global domain private static final long minimumGlobalDelta = 333; // the minimum time difference between access of the same global domain
private static final long maximumDomAge = 60000; // the maximum age of a domain until it is used for another crawl attempt private static final long maximumDomAge = 60000; // the maximum age of a domain until it is used for another crawl attempt
private plasmaCrawlBalancer coreStack; // links found by crawling to depth-1 private plasmaCrawlBalancer coreStack; // links found by crawling to depth-1
private plasmaCrawlBalancer limitStack; // links found by crawling at target depth private plasmaCrawlBalancer limitStack; // links found by crawling at target depth

@ -245,9 +245,7 @@ public final class plasmaSearchEvent {
// sort the local containers and truncate it to a limited count, // sort the local containers and truncate it to a limited count,
// so following sortings together with the global results will be fast // so following sortings together with the global results will be fast
synchronized (rankedCache) { rankedCache.execQuery();
rankedCache.execQuery();
}
} }
} }

@ -1719,6 +1719,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
log.logConfig("SWITCHBOARD SHUTDOWN STEP 2: sending termination signal to threaded indexing"); log.logConfig("SWITCHBOARD SHUTDOWN STEP 2: sending termination signal to threaded indexing");
// closing all still running db importer jobs // closing all still running db importer jobs
this.dbImportManager.close(); this.dbImportManager.close();
httpc.closeAllConnections();
crawlQueues.close(); crawlQueues.close();
wikiDB.close(); wikiDB.close();
blogDB.close(); blogDB.close();
@ -1733,7 +1734,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
parser.close(); parser.close();
plasmaHTCache.close(); plasmaHTCache.close();
sbQueue.close(); sbQueue.close();
httpc.closeAllConnections();
webStructure.flushCitationReference("crg"); webStructure.flushCitationReference("crg");
webStructure.close(); webStructure.close();
log.logConfig("SWITCHBOARD SHUTDOWN STEP 3: sending termination signal to database manager (stand by...)"); log.logConfig("SWITCHBOARD SHUTDOWN STEP 3: sending termination signal to database manager (stand by...)");

@ -340,24 +340,18 @@ public final class plasmaWordIndex implements indexRI {
// get from cache // get from cache
indexContainer container; indexContainer container;
synchronized (dhtOutCache) { container = dhtOutCache.getContainer(wordHash, urlselection);
container = dhtOutCache.getContainer(wordHash, urlselection); if (container == null) {
} container = dhtInCache.getContainer(wordHash, urlselection);
synchronized (dhtInCache) { } else {
if (container == null) { container.addAllUnique(dhtInCache.getContainer(wordHash, urlselection));
container = dhtInCache.getContainer(wordHash, urlselection);
} else {
container.addAllUnique(dhtInCache.getContainer(wordHash, urlselection));
}
} }
// get from collection index // get from collection index
synchronized (this) { if (container == null) {
if (container == null) { container = collections.getContainer(wordHash, urlselection);
container = collections.getContainer(wordHash, urlselection); } else {
} else { container.addAllUnique(collections.getContainer(wordHash, urlselection));
container.addAllUnique(collections.getContainer(wordHash, urlselection));
}
} }
if (container == null) return null; if (container == null) return null;
@ -443,10 +437,8 @@ public final class plasmaWordIndex implements indexRI {
public void close() { public void close() {
dhtInCache.close(); dhtInCache.close();
dhtOutCache.close(); dhtOutCache.close();
synchronized (this) { collections.close();
collections.close(); loadedURL.close();
loadedURL.close();
}
} }
public indexContainer deleteContainer(String wordHash) { public indexContainer deleteContainer(String wordHash) {
@ -455,29 +447,17 @@ public final class plasmaWordIndex implements indexRI {
indexRWIRowEntry.urlEntryRow, indexRWIRowEntry.urlEntryRow,
dhtInCache.sizeContainer(wordHash) + dhtOutCache.sizeContainer(wordHash) + collections.indexSize(wordHash) dhtInCache.sizeContainer(wordHash) + dhtOutCache.sizeContainer(wordHash) + collections.indexSize(wordHash)
); );
synchronized (dhtInCache) { c.addAllUnique(dhtInCache.deleteContainer(wordHash));
c.addAllUnique(dhtInCache.deleteContainer(wordHash)); c.addAllUnique(dhtOutCache.deleteContainer(wordHash));
} c.addAllUnique(collections.deleteContainer(wordHash));
synchronized (dhtOutCache) {
c.addAllUnique(dhtOutCache.deleteContainer(wordHash));
}
synchronized (this) {
c.addAllUnique(collections.deleteContainer(wordHash));
}
return c; return c;
} }
public boolean removeEntry(String wordHash, String urlHash) { public boolean removeEntry(String wordHash, String urlHash) {
boolean removed = false; boolean removed = false;
synchronized (dhtInCache) { removed = removed | (dhtInCache.removeEntry(wordHash, urlHash));
removed = removed | (dhtInCache.removeEntry(wordHash, urlHash)); removed = removed | (dhtOutCache.removeEntry(wordHash, urlHash));
} removed = removed | (collections.removeEntry(wordHash, urlHash));
synchronized (dhtOutCache) {
removed = removed | (dhtOutCache.removeEntry(wordHash, urlHash));
}
synchronized (this) {
removed = removed | (collections.removeEntry(wordHash, urlHash));
}
return removed; return removed;
} }
@ -494,29 +474,17 @@ public final class plasmaWordIndex implements indexRI {
public int removeEntries(String wordHash, Set<String> urlHashes) { public int removeEntries(String wordHash, Set<String> urlHashes) {
int removed = 0; int removed = 0;
synchronized (dhtInCache) { removed += dhtInCache.removeEntries(wordHash, urlHashes);
removed += dhtInCache.removeEntries(wordHash, urlHashes); removed += dhtOutCache.removeEntries(wordHash, urlHashes);
} removed += collections.removeEntries(wordHash, urlHashes);
synchronized (dhtOutCache) {
removed += dhtOutCache.removeEntries(wordHash, urlHashes);
}
synchronized (this) {
removed += collections.removeEntries(wordHash, urlHashes);
}
return removed; return removed;
} }
public String removeEntriesExpl(String wordHash, Set<String> urlHashes) { public String removeEntriesExpl(String wordHash, Set<String> urlHashes) {
String removed = ""; String removed = "";
synchronized (dhtInCache) { removed += dhtInCache.removeEntries(wordHash, urlHashes) + ", ";
removed += dhtInCache.removeEntries(wordHash, urlHashes) + ", "; removed += dhtOutCache.removeEntries(wordHash, urlHashes) + ", ";
} removed += collections.removeEntries(wordHash, urlHashes);
synchronized (dhtOutCache) {
removed += dhtOutCache.removeEntries(wordHash, urlHashes) + ", ";
}
synchronized (this) {
removed += collections.removeEntries(wordHash, urlHashes);
}
return removed; return removed;
} }

@ -423,7 +423,9 @@ public final class serverCore extends serverAbstractThread implements serverThre
controlSocket.setSoLinger(true, this.timeout); controlSocket.setSoLinger(true, this.timeout);
// ensure that MTU-48 is not exceeded to prevent that routers cannot handle large data packets // ensure that MTU-48 is not exceeded to prevent that routers cannot handle large data packets
// read http://www.cisco.com/warp/public/105/38.shtml for explanation
controlSocket.setSendBufferSize(1440); controlSocket.setSendBufferSize(1440);
controlSocket.setReceiveBufferSize(1440);
// create session // create session
Session connection = new Session(sessionThreadGroup, controlSocket, this.timeout); Session connection = new Session(sessionThreadGroup, controlSocket, this.timeout);

@ -99,7 +99,7 @@ public final class yacyClient {
HashMap<String, String> result = null; HashMap<String, String> result = null;
final serverObjects post = yacyNetwork.basicRequestPost(plasmaSwitchboard.getSwitchboard(), null); final serverObjects post = yacyNetwork.basicRequestPost(plasmaSwitchboard.getSwitchboard(), null);
for (int retry = 0; retry < 3; retry++) try { for (int retry = 0; retry < 4; retry++) try {
// generate request // generate request
post.put("count", "20"); post.put("count", "20");
post.put("seed", yacyCore.seedDB.mySeed().genSeedStr(post.get("key", ""))); post.put("seed", yacyCore.seedDB.mySeed().genSeedStr(post.get("key", "")));
@ -119,10 +119,10 @@ public final class yacyClient {
break; break;
} catch (Exception e) { } catch (Exception e) {
if (Thread.currentThread().isInterrupted()) { if (Thread.currentThread().isInterrupted()) {
yacyCore.log.logFine("yacyClient.publishMySeed thread '" + Thread.currentThread().getName() + "' interrupted."); yacyCore.log.logWarning("yacyClient.publishMySeed thread '" + Thread.currentThread().getName() + "' interrupted.");
return -1; return -1;
} else { } else {
yacyCore.log.logFine("yacyClient.publishMySeed thread '" + Thread.currentThread().getName() + "' exception: " + e.getMessage() + "; retry = " + retry); // here VERY OFTEN a 'Connection reset' appears. What is the cause? yacyCore.log.logWarning("yacyClient.publishMySeed thread '" + Thread.currentThread().getName() + "' exception: " + e.getMessage() + "; retry = " + retry); // here VERY OFTEN a 'Connection reset' appears. What is the cause?
// try again (go into loop) // try again (go into loop)
} }
result = null; result = null;

Loading…
Cancel
Save