Next Try for a fix for upload-connection staying in blocked state.

This was caused by reading via GZIP from close-wait connection an caused
high cpu- and system-loads.
Instat of implementing handling of the RedListener now I found a
timelimeted 'get' "realy" solving this problem.
pull/8/head
sixcooler 10 years ago
parent 0fab445b19
commit e427efbe54

@ -35,14 +35,16 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.URL; import java.net.URL;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import javax.servlet.ReadListener; import javax.servlet.ReadListener;
@ -103,6 +105,10 @@ import org.eclipse.jetty.util.MultiPartOutputStream;
import org.eclipse.jetty.util.URIUtil; import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.resource.Resource; import org.eclipse.jetty.util.resource.Resource;
import com.google.common.util.concurrent.SimpleTimeLimiter;
import com.google.common.util.concurrent.TimeLimiter;
import com.google.common.util.concurrent.UncheckedTimeoutException;
/** /**
* YaCyDefaultServlet based on Jetty DefaultServlet.java * YaCyDefaultServlet based on Jetty DefaultServlet.java
* handles static files and the YaCy servlets. * handles static files and the YaCy servlets.
@ -151,6 +157,7 @@ public class YaCyDefaultServlet extends HttpServlet {
protected static final File TMPDIR = new File(System.getProperty("java.io.tmpdir")); protected static final File TMPDIR = new File(System.getProperty("java.io.tmpdir"));
protected static final int SIZE_FILE_THRESHOLD = 100 * 1024 * 1024; // 100 MB is a lot but appropriate for multi-document pushed using the push_p.json servlet protected static final int SIZE_FILE_THRESHOLD = 100 * 1024 * 1024; // 100 MB is a lot but appropriate for multi-document pushed using the push_p.json servlet
protected static final FileItemFactory DISK_FILE_ITEM_FACTORY = new DiskFileItemFactory(SIZE_FILE_THRESHOLD, TMPDIR); protected static final FileItemFactory DISK_FILE_ITEM_FACTORY = new DiskFileItemFactory(SIZE_FILE_THRESHOLD, TMPDIR);
private final static TimeLimiter timeLimiter = new SimpleTimeLimiter(Executors.newCachedThreadPool());
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public void init() throws UnavailableException { public void init() throws UnavailableException {
@ -1126,65 +1133,43 @@ public class YaCyDefaultServlet extends HttpServlet {
private class GZIPRequestStream extends ServletInputStream { private class GZIPRequestStream extends ServletInputStream {
private final GZIPInputStream in; private final GZIPInputStream in;
private final List<ReadListener> listeners = new ArrayList<ReadListener>(); private final ServletInputStream sin;
public GZIPRequestStream(HttpServletRequest request) throws IOException { public GZIPRequestStream(HttpServletRequest request) throws IOException {
this.in = new GZIPInputStream(request.getInputStream()); sin = request.getInputStream();
in = new GZIPInputStream(sin);
} }
@Override @Override
public int read() throws IOException { public int read() throws IOException {
if (isFinished()) return -1; return in.read();
try {
final int r = in.read();
if (r < 1) fireAllDataRead();
return r;
} catch (final IOException ex) {
fireError(ex);
throw ex;
}
} }
@Override @Override
public int read(byte[] b) throws IOException { public int read(byte[] b) throws IOException {
return read(b, 0, b.length); return read(b, 0, b.length);
} }
@Override @Override
public int read(byte[] b, int off, int len) throws IOException { public int read(byte[] b, int off, int len) throws IOException {
if (isFinished()) return -1; try {
try { return timeLimiter.callWithTimeout(new CallableReader(in, b, off, len), len + 600, TimeUnit.MILLISECONDS, false);
final int r = in.read(b, off, len); } catch (final UncheckedTimeoutException e) {
if (r < len) fireAllDataRead(); return -1;
return r; } catch (Exception e) {
} catch (final IOException ex) { throw new IOException(e);
fireError(ex); }
throw ex;
}
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
try { in.close();
in.close();
fireAllDataRead();
} catch (final IOException ex) {
fireError(ex);
throw ex;
}
} }
@Override @Override
public int available() throws IOException { public int available() throws IOException {
try { return in.available();
final int r = in.available();
if (r < 1) fireAllDataRead();
return r;
} catch (final IOException ex) {
fireError(ex);
throw ex;
}
} }
@Override @Override
@ -1199,62 +1184,49 @@ public class YaCyDefaultServlet extends HttpServlet {
@Override @Override
public synchronized void reset() throws IOException { public synchronized void reset() throws IOException {
try { in.reset();
in.reset();
} catch (final IOException ex) {
fireError(ex);
throw ex;
}
} }
@Override @Override
public long skip(long n) throws IOException { public long skip(long n) throws IOException {
try { return in.skip(n);
final long r = in.skip(n);
if (r < n) fireAllDataRead();
return r;
} catch (final IOException ex) {
fireError(ex);
throw ex;
}
} }
@Override @Override
public boolean isFinished() { public boolean isFinished() {
try { try {
return available() < 1; return available() < 1;
} catch (final IOException ex) { } catch (final IOException ex) {
fireError(ex);
return true; return true;
} }
} }
@Override @Override
public boolean isReady() { public boolean isReady() {
return !isFinished(); return sin.isReady() && !isFinished();
} }
@Override @Override
public void setReadListener(ReadListener rl) { public void setReadListener(ReadListener rl) {
listeners.add(rl); sin.setReadListener(rl);
}
private void fireAllDataRead() throws IOException {
for (final ReadListener rl : listeners) {
rl.onAllDataRead();
}
} }
}
// private void fireDataAvailable() throws IOException { private class CallableReader implements Callable<Integer> {
// for (final ReadListener rl : listeners) { private int off, len;
// rl.onDataAvailable(); private byte[] b;
// } private GZIPInputStream in;
// }
public CallableReader(final GZIPInputStream in, byte[] b, int off, int len) {
private void fireError(Throwable th) { this.in = in;
for (final ReadListener rl : listeners) { this.b = b;
rl.onError(th); this.off = off;
} this.len = len;
} }
@Override
public Integer call() throws Exception {
return in.read(b, off, len);
}
} }
} }

Loading…
Cancel
Save