- performance hacks

- added log warnings in case that search processes run into time-out
situations
- better concurrency for Integer formatter (used a non-synchronized
formatter before)
- bugfix for search termination (a poison pill was missing)
- added timeout parameters for search (again) -> target is, that they
are never reached.
pull/1/head
Michael Peter Christen 13 years ago
parent 7a329465b3
commit e0d8643226

@ -163,8 +163,8 @@ public class AccessTracker_p {
prop.put("page_list_" + m + "_dark", ((dark) ? 1 : 0) );
dark =! dark;
prop.putHTML("page_list_" + m + "_host", query.host);
prop.put("page_list_" + m + "_date", SimpleFormatter.format(new Date(query.time.longValue())));
prop.put("page_list_" + m + "_timestamp", query.time.longValue());
prop.put("page_list_" + m + "_date", SimpleFormatter.format(new Date(query.starttime)));
prop.put("page_list_" + m + "_timestamp", query.starttime);
if (page == 2) {
// local search
prop.putNum("page_list_" + m + "_offset", query.offset);

@ -430,7 +430,7 @@ public final class search {
// update the search tracker
synchronized (trackerHandles) {
trackerHandles.add(theQuery.time); // thats the time when the handle was created
trackerHandles.add(theQuery.starttime); // thats the time when the handle was created
// we don't need too much entries in the list; remove superfluous
while (trackerHandles.size() > 36) if (!trackerHandles.remove(trackerHandles.first())) break;
}

@ -931,7 +931,7 @@ public class yacysearch {
// update the search tracker
try {
synchronized ( trackerHandles ) {
trackerHandles.add(theQuery.time);
trackerHandles.add(theQuery.starttime);
while ( trackerHandles.size() > 600 ) {
if ( !trackerHandles.remove(trackerHandles.first()) ) {
break;

@ -91,7 +91,7 @@ public class YMarkAutoTagger implements Runnable, Thread.UncaughtExceptionHandle
}
//get words from document
final Map<String, Word> words = new Condenser(document, true, true, LibraryProvider.dymLib).words();
final Map<String, Word> words = new Condenser(document, true, true, LibraryProvider.dymLib, false).words();
// generate potential tags from document title, description and subject
final int bufferSize = document.dc_title().length() + document.dc_description().length() + document.dc_subject(' ').length() + 32;

@ -26,7 +26,6 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -83,7 +82,6 @@ public final class Condenser {
public static final int flag_cat_hasvideo = 22; // the page refers to (at least one) videos
public static final int flag_cat_hasapp = 23; // the page refers to (at least one) application file
private final static int numlength = 5;
//private Properties analysis;
private final Map<String, Word> words; // a string (the words) to (indexWord) - relation
@ -96,19 +94,29 @@ public final class Condenser {
public int RESULT_DIFF_SENTENCES = -1;
public Bitfield RESULT_FLAGS = new Bitfield(4);
private final Identificator languageIdentificator;
private final NumberFormat intStringFormatter = NumberFormat.getIntegerInstance(); // use a new instance for each object for a better concurrency
/*
private final static int numlength = 5;
private static final ThreadLocal <NumberFormat> intStringFormatter =
new ThreadLocal <NumberFormat>() {
@Override protected NumberFormat initialValue() {
NumberFormat n = NumberFormat.getIntegerInstance();
n.setMinimumIntegerDigits(numlength);
n.setMaximumIntegerDigits(numlength);
return n;
}
};
*/
public Condenser(
final Document document,
final boolean indexText,
final boolean indexMedia,
final WordCache meaningLib
final WordCache meaningLib,
final boolean doAutotagging
) {
Thread.currentThread().setName("condenser-" + document.dc_identifier()); // for debugging
// if addMedia == true, then all the media links are also parsed and added to the words
// added media words are flagged with the appropriate media flag
this.intStringFormatter.setMinimumIntegerDigits(numlength);
this.intStringFormatter.setMaximumIntegerDigits(numlength);
this.words = new HashMap<String, Word>();
this.RESULT_FLAGS = new Bitfield(4);
@ -124,7 +132,7 @@ public final class Condenser {
Map.Entry<MultiProtocolURI, String> entry;
if (indexText) {
createCondensement(document.getText(), meaningLib);
createCondensement(document.getText(), meaningLib, doAutotagging);
// the phrase counter:
// phrase 0 are words taken from the URL
// phrase 1 is the MainTitle
@ -262,11 +270,11 @@ public final class Condenser {
}
}
public Condenser(final InputStream text, final WordCache meaningLib) {
public Condenser(final InputStream text, final WordCache meaningLib, boolean doAutotagging) {
this.languageIdentificator = null; // we don't need that here
// analysis = new Properties();
this.words = new TreeMap<String, Word>();
createCondensement(text, meaningLib);
createCondensement(text, meaningLib, doAutotagging);
}
public int excludeWords(final SortedSet<String> stopwords) {
@ -286,7 +294,7 @@ public final class Condenser {
return this.languageIdentificator.getLanguage();
}
private void createCondensement(final InputStream is, final WordCache meaningLib) {
private void createCondensement(final InputStream is, final WordCache meaningLib, boolean doAutotagging) {
assert is != null;
final Set<String> currsentwords = new HashSet<String>();
String word = "";
@ -312,8 +320,10 @@ public final class Condenser {
if (word.length() < wordminsize) continue;
// get tags from autotagging
tag = LibraryProvider.autotagging.getPrintTagFromWord(word);
if (tag != null) this.tags.add(tag);
if (doAutotagging) {
tag = LibraryProvider.autotagging.getPrintTagFromWord(word);
if (tag != null) this.tags.add(tag);
}
// distinguish punctuation and words
wordlen = word.length();
@ -393,7 +403,7 @@ public final class Condenser {
if (text == null) return null;
ByteArrayInputStream buffer;
buffer = new ByteArrayInputStream(UTF8.getBytes(text));
return new Condenser(buffer, meaningLib).words();
return new Condenser(buffer, meaningLib, false).words();
}
public static void main(final String[] args) {

@ -11,12 +11,12 @@
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file lgpl21.txt
* If not, see <http://www.gnu.org/licenses/>.
@ -32,7 +32,6 @@ import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import net.yacy.cora.document.MultiProtocolURI;
import net.yacy.cora.document.UTF8;
import net.yacy.document.AbstractParser;
@ -42,19 +41,20 @@ import net.yacy.document.LibraryProvider;
import net.yacy.document.Parser;
import net.yacy.kelondro.data.word.Word;
import net.yacy.kelondro.util.BDecoder;
import net.yacy.kelondro.util.FileUtils;
import net.yacy.kelondro.util.BDecoder.BObject;
import net.yacy.kelondro.util.BDecoder.BType;
import net.yacy.kelondro.util.FileUtils;
// a BT parser according to http://wiki.theory.org/BitTorrentSpecification
public class torrentParser extends AbstractParser implements Parser {
public torrentParser() {
super("Torrent Metadata Parser");
SUPPORTED_EXTENSIONS.add("torrent");
SUPPORTED_MIME_TYPES.add("application/x-bittorrent");
this.SUPPORTED_EXTENSIONS.add("torrent");
this.SUPPORTED_MIME_TYPES.add("application/x-bittorrent");
}
@Override
public Document[] parse(MultiProtocolURI location, String mimeType, String charset, InputStream source)
throws Parser.Failure, InterruptedException {
byte[] b = null;
@ -102,11 +102,11 @@ public class torrentParser extends AbstractParser implements Parser {
null,
null,
title, // title
comment, // author
comment, // author
location.getHost(),
null,
null,
0.0f, 0.0f,
0.0f, 0.0f,
filenames.toString().getBytes(charset),
null,
null,
@ -116,13 +116,13 @@ public class torrentParser extends AbstractParser implements Parser {
throw new Parser.Failure("error in torrentParser, getBytes: " + e.getMessage(), location);
}
}
public static void main(String[] args) {
try {
byte[] b = FileUtils.read(new File(args[0]));
torrentParser parser = new torrentParser();
Document[] d = parser.parse(new MultiProtocolURI("http://localhost/test.torrent"), null, "UTF-8", new ByteArrayInputStream(b));
Condenser c = new Condenser(d[0], true, true, LibraryProvider.dymLib);
Condenser c = new Condenser(d[0], true, true, LibraryProvider.dymLib, false);
Map<String, Word> w = c.words();
for (Map.Entry<String, Word> e: w.entrySet()) System.out.println("Word: " + e.getKey() + " - " + e.getValue().posInText);
} catch (IOException e) {
@ -133,5 +133,5 @@ public class torrentParser extends AbstractParser implements Parser {
e.printStackTrace();
}
}
}

@ -36,6 +36,7 @@ import net.yacy.cora.document.ASCII;
import net.yacy.cora.document.UTF8;
import net.yacy.kelondro.index.Row;
import net.yacy.kelondro.index.Row.Entry;
import net.yacy.kelondro.logging.Log;
import net.yacy.kelondro.order.Base64Order;
import net.yacy.kelondro.order.Bitfield;
import net.yacy.kelondro.order.MicroDate;
@ -429,10 +430,11 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc
// transform without concurrency to omit thread creation overhead
for (final Row.Entry entry: container) try {
vars.put(new WordReferenceVars(new WordReferenceRow(entry)));
} catch (final InterruptedException e) {}
try {
vars.put(WordReferenceVars.poison);
} catch (final InterruptedException e) {}
} catch (final InterruptedException e) {} finally {
try {
vars.put(WordReferenceVars.poison);
} catch (final InterruptedException e) {}
}
return vars;
}
final Thread distributor = new TransformDistributor(container, vars, maxtime);
@ -470,13 +472,26 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc
while (p > 0) {
p--;
worker[p % cores0].add(this.container.get(p, false));
if (p % 100 == 0 && System.currentTimeMillis() > timeout) break;
if (p % 100 == 0 && System.currentTimeMillis() > timeout) {
Log.logWarning("TransformDistributor", "distribution of WordReference entries to worker queues ended with timeout = " + this.maxtime);
break;
}
}
// insert poison to stop the queues
for (int i = 0; i < cores0; i++) {
worker[i].add(WordReferenceRow.poisonRowEntry);
}
// wait for the worker to terminate because we want to place a poison entry into the out queue afterwards
for (int i = 0; i < cores0; i++) {
try {
worker[i].join();
} catch (InterruptedException e) {
}
}
this.out.add(WordReferenceVars.poison);
}
}
@ -506,7 +521,10 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc
try {
while ((entry = this.in.take()) != WordReferenceRow.poisonRowEntry) {
this.out.put(new WordReferenceVars(new WordReferenceRow(entry)));
if (System.currentTimeMillis() > timeout) break;
if (System.currentTimeMillis() > timeout) {
Log.logWarning("TransformWorker", "normalization of row entries from row to vars ended with timeout = " + this.maxtime);
break;
}
}
} catch (final InterruptedException e) {}
}

@ -520,7 +520,9 @@ public class RowSet extends RowCollection implements Index, Iterable<Row.Entry>,
int c0p, c1p;
int o;
final int objectsize = c0.rowdef.objectsize;
while (c0i < c0.size() && c1i < c1.size()) {
final int c0s = c0.size();
final int c1s = c1.size();
while (c0i < c0s && c1i < c1s) {
c0p = c0i * objectsize;
c1p = c1i * objectsize;
o = c0.rowdef.objectOrder.compare(

@ -137,10 +137,11 @@ public final class ConsoleOutErrHandler extends Handler {
if (record.getLevel().intValue() >= this.splitLevel.intValue()) {
this.stdErrHandler.publish(record);
this.stdErrHandler.flush();
} else {
this.stdOutHandler.publish(record);
this.stdOutHandler.flush();
}
flush();
}
@Override

@ -1,4 +1,4 @@
//ConsoleOutHandler.java
//ConsoleOutHandler.java
//-------------------------------------
//part of YACY
//(C) by Michael Peter Christen; mc@yacy.net
@ -34,18 +34,20 @@ import java.util.logging.StreamHandler;
public final class ConsoleOutHandler extends StreamHandler {
private static int c = 0;
public ConsoleOutHandler() {
setLevel(Level.FINEST);
setFormatter(new SimpleFormatter());
setOutputStream(System.out);
setOutputStream(System.out);
}
@Override
public final synchronized void publish(final LogRecord record) {
super.publish(record);
flush();
if (c++ % 10 == 0) flush(); // not so many flushes, makes too much IO
}
@Override
public final synchronized void close() {
flush();

@ -1,4 +1,4 @@
// Formatter.java
// Formatter.java
// -----------------------
// part of YACY
// (C) by Michael Peter Christen; mc@yacy.net
@ -40,26 +40,37 @@ import java.util.Locale;
* to the locale set for YaCy.
*/
public final class Formatter {
// default formatter
private static NumberFormat numForm = NumberFormat.getInstance(new Locale("en"));
// generic formatter that can be used when no localized formatting is allowed
private static final NumberFormat cleanNumForm =
new DecimalFormat("####.##", new DecimalFormatSymbols(Locale.ENGLISH));
static {
// just initialize defaults on class load
initDefaults();
}
private static Locale locale = new Locale("en");
/**
* use ThreadLocal to generate new formatter for each Thread since NumberFormat is not synchronized
*/
private static final ThreadLocal <NumberFormat> numForm =
new ThreadLocal <NumberFormat>() {
@Override protected NumberFormat initialValue() {
NumberFormat n = locale == null ? new DecimalFormat("####.##", new DecimalFormatSymbols(Locale.ENGLISH)) : NumberFormat.getInstance(locale);
n.setGroupingUsed(true); // always group int digits
n.setParseIntegerOnly(false); // allow int/double/float
n.setMaximumFractionDigits(2); // 2 decimal digits for float/double
return n;
}
};
private static final ThreadLocal <NumberFormat> cleanNumForm =
new ThreadLocal <NumberFormat>() {
@Override protected NumberFormat initialValue() {
NumberFormat n = new DecimalFormat("####.##", new DecimalFormatSymbols(Locale.ENGLISH));
return n;
}
};
/**
* @param locale the {@link Locale} to set or <code>null</code> to set the special
* empty locale to create unformatted numbers
*/
public static void setLocale(final Locale locale) {
numForm = (locale == null ? cleanNumForm : NumberFormat.getInstance(locale));
initDefaults();
public static void setLocale(final Locale l) {
locale = l;
}
/**
@ -67,28 +78,24 @@ public final class Formatter {
*/
public static void setLocale(final String lang) {
final String l = (lang.equalsIgnoreCase("default") ? "en" : lang.toLowerCase());
setLocale(l.equals("none") ? null : new Locale(l));
}
private static void initDefaults() {
numForm.setGroupingUsed(true); // always group int digits
numForm.setParseIntegerOnly(false); // allow int/double/float
numForm.setMaximumFractionDigits(2); // 2 decimal digits for float/double
}
public static String number(final double d, final boolean localized) {
return (localized ? number(d) : cleanNumForm.format(d));
return (localized ? numForm.get().format(d) : cleanNumForm.get().format(d));
}
public static String number(final double d) {
return numForm.format(d);
return numForm.get().format(d);
}
public static String number(final long l, final boolean localized) {
return (localized ? number(l) : cleanNumForm.format(l));
return (localized ? numForm.get().format(l) : cleanNumForm.get().format(l));
}
public static String number(final long l) {
return numForm.format(l);
return numForm.get().format(l);
}
/**
@ -107,11 +114,11 @@ public final class Formatter {
ret = number(Float.parseFloat(s));
}
} catch (final NumberFormatException e) { /* empty */ }
return (ret == null ? "-" : ret);
}
/**
* Formats a number if it are bytes to greatest unit (1024 based)
* @param byteCount

@ -786,7 +786,7 @@ public final class Protocol
// store remote result to local result container
// insert one container into the search result buffer
// one is enough, only the references are used, not the word
containerCache.add(container[0], false, target.getName() + "/" + target.hash, result.joincount, true, 5000);
containerCache.add(container[0], false, target.getName() + "/" + target.hash, result.joincount, true, time);
containerCache.addExpectedRemoteReferences(-count);
// insert the containers to the index

@ -2504,7 +2504,7 @@ public final class Switchboard extends serverSwitch
condenser[i] =
new Condenser(in.documents[i], in.queueEntry.profile().indexText(), in.queueEntry
.profile()
.indexMedia(), LibraryProvider.dymLib);
.indexMedia(), LibraryProvider.dymLib, true);
// update image result list statistics
// its good to do this concurrently here, because it needs a DNS lookup
@ -2776,7 +2776,7 @@ public final class Switchboard extends serverSwitch
throw new Parser.Failure("indexing is denied", url);
}
final Condenser condenser =
new Condenser(document, true, true, LibraryProvider.dymLib);
new Condenser(document, true, true, LibraryProvider.dymLib, true);
ResultImages.registerImages(url, document, true);
Switchboard.this.webStructure.generateCitationReference(url, document, condenser);
storeDocumentIndex(

@ -158,7 +158,7 @@ public class DocumentIndex extends Segment
final URIMetadataRow[] rows = new URIMetadataRow[documents.length];
int c = 0;
for ( final Document document : documents ) {
final Condenser condenser = new Condenser(document, true, true, LibraryProvider.dymLib);
final Condenser condenser = new Condenser(document, true, true, LibraryProvider.dymLib, true);
rows[c++] =
super.storeDocument(
url,

@ -474,7 +474,7 @@ public class Segment {
}
// get the word set
Set<String> words = null;
words = new Condenser(document, true, true, null).words().keySet();
words = new Condenser(document, true, true, null, false).words().keySet();
// delete all word references
int count = 0;

@ -82,7 +82,7 @@ public class AccessTracker {
final long timeout = System.currentTimeMillis() - maxAge;
while (list.size() > 0) {
final QueryParams q = list.getFirst();
if (q.time.longValue() > timeout) break;
if (q.starttime > timeout) break;
addToDump(list.removeFirst());
}
}
@ -103,7 +103,7 @@ public class AccessTracker {
//if (query.resultcount == 0) return;
if (query.queryString == null || query.queryString.length() == 0) return;
final StringBuilder sb = new StringBuilder(40);
sb.append(GenericFormatter.SHORT_SECOND_FORMATTER.format(new Date(query.time)));
sb.append(GenericFormatter.SHORT_SECOND_FORMATTER.format(new Date(query.starttime)));
sb.append(' ');
sb.append(Integer.toString(query.resultcount));
sb.append(' ');

@ -133,7 +133,7 @@ public final class QueryParams {
public final String tenant;
public final Modifier modifier;
public Seed remotepeer;
public final Long time;
public final long starttime, maxtime, timeout; // the time when the query started, how long it should take and the time when the timeout is reached (milliseconds)
// values that are set after a search:
public int resultcount; // number of found results
public int transmitcount; // number of results that had been shown to the user
@ -192,7 +192,9 @@ public final class QueryParams {
this.siteexcludes = null;
this.authorhash = null;
this.remotepeer = null;
this.time = Long.valueOf(System.currentTimeMillis());
this.starttime = Long.valueOf(System.currentTimeMillis());
this.maxtime = 10000;
this.timeout = this.starttime + this.timeout;
this.specialRights = false;
this.navigators = "all";
this.indexSegment = indexSegment;
@ -270,7 +272,9 @@ public final class QueryParams {
this.snippetCacheStrategy = snippetCacheStrategy;
this.host = host;
this.remotepeer = null;
this.time = Long.valueOf(System.currentTimeMillis());
this.starttime = Long.valueOf(System.currentTimeMillis());
this.maxtime = 10000;
this.timeout = this.starttime + this.timeout;
this.specialRights = specialRights;
this.indexSegment = indexSegment;
this.userAgent = userAgent;
@ -378,6 +382,7 @@ public final class QueryParams {
public static final boolean anymatch(final String text, final HandleSet keyhashes) {
// returns true if any of the word hashes in keyhashes appear in the String text
// to do this, all words in the string must be recognized and transcoded to word hashes
if (keyhashes == null || keyhashes.isEmpty()) return false;
final HandleSet wordhashes = Word.words2hashesHandles(Condenser.getWords(text, null).keySet());
return SetTools.anymatch(wordhashes, keyhashes);
}

@ -99,6 +99,7 @@ public final class RWIProcess extends Thread
private final ReferenceOrder order;
private boolean addRunning;
private final boolean remote;
private final long maxtime;
// navigation scores
private final ScoreMap<String> hostNavigator; // a counter for the appearance of the host hash
@ -145,6 +146,7 @@ public final class RWIProcess extends Thread
this.maxExpectedRemoteReferences = new AtomicInteger(0);
this.expectedRemoteReferences = new AtomicInteger(0);
this.receivedRemoteReferences = new AtomicInteger(0);
this.maxtime = query.maxtime;
}
public void addExpectedRemoteReferences(int x) {
@ -206,7 +208,7 @@ public final class RWIProcess extends Thread
System.currentTimeMillis() - timer),
false);
if ( !index.isEmpty() ) {
add(index, true, "local index: " + this.query.getSegment().getLocation(), -1, true, 10000);
add(index, true, "local index: " + this.query.getSegment().getLocation(), -1, true, this.maxtime);
}
} catch ( final Exception e ) {
Log.logException(e);
@ -260,6 +262,7 @@ public final class RWIProcess extends Thread
this.query.navigators.equals("all") || this.query.navigators.indexOf("hosts", 0) >= 0;
// apply all constraints
long timeout = System.currentTimeMillis() + maxtime;
try {
WordReferenceVars iEntry;
final String pattern = this.query.urlMask.pattern();
@ -269,9 +272,16 @@ public final class RWIProcess extends Thread
|| pattern.equals("ftp://.*")
|| pattern.equals("smb://.*")
|| pattern.equals("file://.*");
long remaining;
pollloop: while ( true ) {
iEntry = decodedEntries.poll(1, TimeUnit.SECONDS);
if ( iEntry == null || iEntry == WordReferenceVars.poison ) {
remaining = timeout - System.currentTimeMillis();
if (remaining <= 0) break;
iEntry = decodedEntries.poll(remaining, TimeUnit.MILLISECONDS);
if ( iEntry == null ) {
Log.logWarning("RWIProcess", "terminated 'add' loop after poll time-out = " + remaining);
break pollloop;
}
if ( iEntry == WordReferenceVars.poison ) {
break pollloop;
}
assert (iEntry.urlhash().length == index.row().primaryKeyLength);
@ -363,6 +373,7 @@ public final class RWIProcess extends Thread
//}
}
}
if (System.currentTimeMillis() >= timeout) Log.logWarning("RWIProcess", "rwi normalization ended with timeout = " + maxtime);
} catch ( final InterruptedException e ) {
} catch ( final RowSpaceExceededException e ) {
@ -602,9 +613,10 @@ public final class RWIProcess extends Thread
final String pagetitle = page.dc_title().toLowerCase();
// check exclusion
if ( (QueryParams.anymatch(pagetitle, this.query.excludeHashes))
if ( this.query.excludeHashes != null && !this.query.excludeHashes.isEmpty() &&
((QueryParams.anymatch(pagetitle, this.query.excludeHashes))
|| (QueryParams.anymatch(pageurl.toLowerCase(), this.query.excludeHashes))
|| (QueryParams.anymatch(pageauthor.toLowerCase(), this.query.excludeHashes)) ) {
|| (QueryParams.anymatch(pageauthor.toLowerCase(), this.query.excludeHashes)))) {
this.sortout++;
continue;
}

@ -366,7 +366,7 @@ public class SnippetProcess {
(this.rankingProcess.feedingIsFinished() && this.rankingProcess.sizeQueue() == 0)) {
break;
}
worker = new Worker(i, 10000, this.query.snippetCacheStrategy, this.query.snippetMatcher, neededResults);
worker = new Worker(i, this.query.maxtime, this.query.snippetCacheStrategy, this.query.snippetMatcher, neededResults);
worker.start();
this.workerThreads[i] = worker;
if (this.rankingProcess.expectMoreRemoteReferences()) {
@ -388,7 +388,7 @@ public class SnippetProcess {
break;
}
if (this.workerThreads[i] == null || !this.workerThreads[i].isAlive()) {
worker = new Worker(i, 10000, this.query.snippetCacheStrategy, this.query.snippetMatcher, neededResults);
worker = new Worker(i, this.query.maxtime, this.query.snippetCacheStrategy, this.query.snippetMatcher, neededResults);
worker.start();
this.workerThreads[i] = worker;
deployCount--;
@ -534,6 +534,9 @@ public class SnippetProcess {
SnippetProcess.this.rankingProcess.addTopics(resultEntry);
}
}
if (System.currentTimeMillis() >= this.timeout) {
Log.logWarning("SnippetProcess", "worker ended with timoeout");
}
//System.out.println("FINISHED WORKER " + id + " FOR " + this.neededResults + " RESULTS, loops = " + loops);
} catch (final Exception e) {
Log.logException(e);

@ -71,10 +71,6 @@ public class ReferenceOrder {
if (container.size() < 100) threads = 2;
final Thread distributor = new NormalizeDistributor(container, out, threads, maxtime);
distributor.start();
try {
distributor.join(10); // let the distributor work for at least 10 milliseconds
} catch (final InterruptedException e) {
}
// return the resulting queue while the processing queues are still working
return out;
@ -103,17 +99,21 @@ public class ReferenceOrder {
final Semaphore termination = new Semaphore(this.threads);
final NormalizeWorker[] worker = new NormalizeWorker[this.threads];
for (int i = 0; i < this.threads; i++) {
worker[i] = new NormalizeWorker(this.out, termination);
worker[i] = new NormalizeWorker(this.out, termination, this.maxtime);
worker[i].start();
}
// fill the queue
WordReferenceVars iEntry;
int p = 0;
long timeout = System.currentTimeMillis() + this.maxtime;
try {
while ((iEntry = vars.take()) != WordReferenceVars.poison) {
worker[p % this.threads].add(iEntry);
p++;
if (System.currentTimeMillis() > timeout) {
Log.logWarning("NormalizeDistributor", "adding of decoded rows to workers ended with timeout = " + this.maxtime);
}
}
} catch (final InterruptedException e) {
}
@ -136,11 +136,13 @@ public class ReferenceOrder {
private final BlockingQueue<WordReferenceVars> out;
private final Semaphore termination;
private final BlockingQueue<WordReferenceVars> decodedEntries;
private final long maxtime;
public NormalizeWorker(final BlockingQueue<WordReferenceVars> out, final Semaphore termination) {
public NormalizeWorker(final BlockingQueue<WordReferenceVars> out, final Semaphore termination, long maxtime) {
this.out = out;
this.termination = termination;
this.decodedEntries = new LinkedBlockingQueue<WordReferenceVars>();
this.maxtime = maxtime;
}
public void add(final WordReferenceVars entry) {
@ -158,6 +160,7 @@ public class ReferenceOrder {
String dom;
Integer count;
final Integer int1 = 1;
long timeout = System.currentTimeMillis() + this.maxtime;
while ((iEntry = this.decodedEntries.take()) != WordReferenceVars.poison) {
// find min/max
if (ReferenceOrder.this.min == null) ReferenceOrder.this.min = iEntry.clone(); else ReferenceOrder.this.min.min(iEntry);
@ -171,6 +174,11 @@ public class ReferenceOrder {
} else {
doms0.put(dom, LargeNumberCache.valueOf(count.intValue() + 1));
}
if (System.currentTimeMillis() > timeout) {
Log.logWarning("NormalizeWorker", "normlization of decoded rows ended with timeout = " + this.maxtime);
break;
}
}
// update domain score

Loading…
Cancel
Save