From e427efbe54bd0b45bb33c74d7e8b8c65f8fc1f6b Mon Sep 17 00:00:00 2001 From: sixcooler Date: Sun, 14 Jun 2015 22:56:26 +0200 Subject: [PATCH] Next Try for a fix for upload-connection staying in blocked state. This was caused by reading via GZIP from close-wait connection an caused high cpu- and system-loads. Instat of implementing handling of the RedListener now I found a timelimeted 'get' "realy" solving this problem. --- .../http/servlets/YaCyDefaultServlet.java | 120 +++++++----------- 1 file changed, 46 insertions(+), 74 deletions(-) diff --git a/source/net/yacy/http/servlets/YaCyDefaultServlet.java b/source/net/yacy/http/servlets/YaCyDefaultServlet.java index 58742a9d3..824d5dfa4 100644 --- a/source/net/yacy/http/servlets/YaCyDefaultServlet.java +++ b/source/net/yacy/http/servlets/YaCyDefaultServlet.java @@ -35,14 +35,16 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; import java.util.AbstractMap; -import java.util.ArrayList; import java.util.Enumeration; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; import javax.servlet.ReadListener; @@ -103,6 +105,10 @@ import org.eclipse.jetty.util.MultiPartOutputStream; import org.eclipse.jetty.util.URIUtil; import org.eclipse.jetty.util.resource.Resource; +import com.google.common.util.concurrent.SimpleTimeLimiter; +import com.google.common.util.concurrent.TimeLimiter; +import com.google.common.util.concurrent.UncheckedTimeoutException; + /** * YaCyDefaultServlet based on Jetty DefaultServlet.java * handles static files and the YaCy servlets. @@ -151,6 +157,7 @@ public class YaCyDefaultServlet extends HttpServlet { protected static final File TMPDIR = new File(System.getProperty("java.io.tmpdir")); protected static final int SIZE_FILE_THRESHOLD = 100 * 1024 * 1024; // 100 MB is a lot but appropriate for multi-document pushed using the push_p.json servlet protected static final FileItemFactory DISK_FILE_ITEM_FACTORY = new DiskFileItemFactory(SIZE_FILE_THRESHOLD, TMPDIR); + private final static TimeLimiter timeLimiter = new SimpleTimeLimiter(Executors.newCachedThreadPool()); /* ------------------------------------------------------------ */ @Override public void init() throws UnavailableException { @@ -1126,65 +1133,43 @@ public class YaCyDefaultServlet extends HttpServlet { private class GZIPRequestStream extends ServletInputStream { - private final GZIPInputStream in; - private final List listeners = new ArrayList(); + private final GZIPInputStream in; + private final ServletInputStream sin; public GZIPRequestStream(HttpServletRequest request) throws IOException { - this.in = new GZIPInputStream(request.getInputStream()); + sin = request.getInputStream(); + in = new GZIPInputStream(sin); } @Override public int read() throws IOException { - if (isFinished()) return -1; - try { - final int r = in.read(); - if (r < 1) fireAllDataRead(); - return r; - } catch (final IOException ex) { - fireError(ex); - throw ex; - } + return in.read(); } @Override public int read(byte[] b) throws IOException { - return read(b, 0, b.length); + return read(b, 0, b.length); } @Override public int read(byte[] b, int off, int len) throws IOException { - if (isFinished()) return -1; - try { - final int r = in.read(b, off, len); - if (r < len) fireAllDataRead(); - return r; - } catch (final IOException ex) { - fireError(ex); - throw ex; - } + try { + return timeLimiter.callWithTimeout(new CallableReader(in, b, off, len), len + 600, TimeUnit.MILLISECONDS, false); + } catch (final UncheckedTimeoutException e) { + return -1; + } catch (Exception e) { + throw new IOException(e); + } } @Override public void close() throws IOException { - try { - in.close(); - fireAllDataRead(); - } catch (final IOException ex) { - fireError(ex); - throw ex; - } + in.close(); } @Override public int available() throws IOException { - try { - final int r = in.available(); - if (r < 1) fireAllDataRead(); - return r; - } catch (final IOException ex) { - fireError(ex); - throw ex; - } + return in.available(); } @Override @@ -1199,62 +1184,49 @@ public class YaCyDefaultServlet extends HttpServlet { @Override public synchronized void reset() throws IOException { - try { - in.reset(); - } catch (final IOException ex) { - fireError(ex); - throw ex; - } + in.reset(); } @Override public long skip(long n) throws IOException { - try { - final long r = in.skip(n); - if (r < n) fireAllDataRead(); - return r; - } catch (final IOException ex) { - fireError(ex); - throw ex; - } + return in.skip(n); } @Override public boolean isFinished() { - try { + try { return available() < 1; } catch (final IOException ex) { - fireError(ex); return true; } } @Override public boolean isReady() { - return !isFinished(); + return sin.isReady() && !isFinished(); } @Override public void setReadListener(ReadListener rl) { - listeners.add(rl); - } - - private void fireAllDataRead() throws IOException { - for (final ReadListener rl : listeners) { - rl.onAllDataRead(); - } - } - -// private void fireDataAvailable() throws IOException { -// for (final ReadListener rl : listeners) { -// rl.onDataAvailable(); -// } -// } - - private void fireError(Throwable th) { - for (final ReadListener rl : listeners) { - rl.onError(th); - } + sin.setReadListener(rl); } } + + private class CallableReader implements Callable { + private int off, len; + private byte[] b; + private GZIPInputStream in; + + public CallableReader(final GZIPInputStream in, byte[] b, int off, int len) { + this.in = in; + this.b = b; + this.off = off; + this.len = len; + } + + @Override + public Integer call() throws Exception { + return in.read(b, off, len); + } + } }