diff --git a/source/de/anomic/search/ReferenceOrder.java b/source/de/anomic/search/ReferenceOrder.java index 63bef61e4..1641b82bc 100644 --- a/source/de/anomic/search/ReferenceOrder.java +++ b/source/de/anomic/search/ReferenceOrder.java @@ -64,9 +64,10 @@ public class ReferenceOrder { } public BlockingQueue normalizeWith(final ReferenceContainer container) { - BlockingQueue vars = WordReferenceVars.transform(container); LinkedBlockingQueue out = new LinkedBlockingQueue(); - Thread distributor = new NormalizeDistributor(vars, out); + int threads = cores + 1; + if (container.size() < 20) threads = 2; + Thread distributor = new NormalizeDistributor(container, out, threads); distributor.start(); // return the resulting queue while the processing queues are still working @@ -75,21 +76,25 @@ public class ReferenceOrder { public class NormalizeDistributor extends Thread { - BlockingQueue vars; + ReferenceContainer container; LinkedBlockingQueue out; + private int threads; - public NormalizeDistributor(BlockingQueue vars, LinkedBlockingQueue out) { - this.vars = vars; + public NormalizeDistributor(ReferenceContainer container, LinkedBlockingQueue out, int threads) { + this.container = container; this.out = out; + this.threads = threads; } @Override public void run() { + // transform the reference container into a stream of parsed entries + BlockingQueue vars = WordReferenceVars.transform(container); + // start the transformation threads - int cores0 = cores + 1; - Semaphore termination = new Semaphore(cores0); - NormalizeWorker[] worker = new NormalizeWorker[cores0]; - for (int i = 0; i < cores0; i++) { + Semaphore termination = new Semaphore(this.threads); + NormalizeWorker[] worker = new NormalizeWorker[this.threads]; + for (int i = 0; i < this.threads; i++) { worker[i] = new NormalizeWorker(out, termination); worker[i].start(); } @@ -99,17 +104,20 @@ public class ReferenceOrder { int p = 0; try { while ((iEntry = vars.take()) != WordReferenceVars.poison) { - worker[p % cores0].add(iEntry); + worker[p % this.threads].add(iEntry); p++; } } catch (InterruptedException e) { } // insert poison to stop the queues - for (int i = 0; i < cores0; i++) worker[i].add(WordReferenceVars.poison); + for (int i = 0; i < this.threads; i++) worker[i].add(WordReferenceVars.poison); } } + /** + * normalize ranking: find minimum and maximum of separate ranking criteria + */ public class NormalizeWorker extends Thread { private final BlockingQueue out; @@ -131,7 +139,34 @@ public class ReferenceOrder { public void run() { try { - addNormalizer(decodedEntries, out); + WordReferenceVars iEntry; + Map doms0 = new HashMap(); + String dom; + Integer count; + final Integer int1 = 1; + while ((iEntry = decodedEntries.take()) != WordReferenceVars.poison) { + out.put(iEntry); + // find min/max + if (min == null) min = iEntry.clone(); else min.min(iEntry); + if (max == null) max = iEntry.clone(); else max.max(iEntry); + // update domcount + dom = new String(iEntry.metadataHash(), 6, 6); + count = doms0.get(dom); + if (count == null) { + doms0.put(dom, int1); + } else { + doms0.put(dom, Integer.valueOf(count.intValue() + 1)); + } + } + + // update domain score + Map.Entry entry; + final Iterator> di = doms0.entrySet().iterator(); + while (di.hasNext()) { + entry = di.next(); + doms.addScore(entry.getKey(), (entry.getValue()).intValue()); + } + if (!doms.isEmpty()) maxdomcount = doms.getMaxScore(); } catch (InterruptedException e) { Log.logException(e); } catch (Exception e) { @@ -146,57 +181,6 @@ public class ReferenceOrder { } } - /** - * normalize ranking: find minimum and maximum of separate ranking criteria - * @param decodedEntries - * @param out - * @throws InterruptedException - */ - public void addNormalizer(BlockingQueue decodedEntries, final BlockingQueue out) throws InterruptedException { - WordReferenceVars iEntry; - Map doms0 = new HashMap(); - String dom; - Integer count; - final Integer int1 = 1; - while ((iEntry = decodedEntries.take()) != WordReferenceVars.poison) { - out.put(iEntry); - // find min/max - if (min == null) min = iEntry.clone(); else min.min(iEntry); - if (max == null) max = iEntry.clone(); else max.max(iEntry); - // update domcount - dom = new String(iEntry.metadataHash(), 6, 6); - count = doms0.get(dom); - if (count == null) { - doms0.put(dom, int1); - } else { - doms0.put(dom, Integer.valueOf(count.intValue() + 1)); - } - } - - // update domain score - Map.Entry entry; - final Iterator> di = doms0.entrySet().iterator(); - while (di.hasNext()) { - entry = di.next(); - doms.addScore(entry.getKey(), (entry.getValue()).intValue()); - } - if (!doms.isEmpty()) this.maxdomcount = doms.getMaxScore(); - } - - public void addNormalizer(WordReferenceVars iEntry, final BlockingQueue out) throws InterruptedException { - out.put(iEntry); - - // find min/max - if (min == null) min = iEntry.clone(); else min.min(iEntry); - if (max == null) max = iEntry.clone(); else max.max(iEntry); - - // update domcount - String dom = new String(iEntry.metadataHash(), 6, 6); - doms.incScore(dom); - - if (!doms.isEmpty()) this.maxdomcount = doms.getMaxScore(); - } - public int authority(final byte[] urlHash) { return (doms.getScore(new String(urlHash, 6, 6)) << 8) / (1 + this.maxdomcount); } diff --git a/source/de/anomic/search/ResultFetcher.java b/source/de/anomic/search/ResultFetcher.java index 86e8d9a34..ec2e4a972 100644 --- a/source/de/anomic/search/ResultFetcher.java +++ b/source/de/anomic/search/ResultFetcher.java @@ -357,10 +357,10 @@ public class ResultFetcher { public ArrayList> completeResults(final long waitingtime) { final long timeout = System.currentTimeMillis() + waitingtime; while ((result.sizeAvailable() < query.neededResults()) && (anyWorkerAlive()) && (System.currentTimeMillis() < timeout)) { - try {Thread.sleep(100);} catch (final InterruptedException e) {} + try {Thread.sleep(20);} catch (final InterruptedException e) {} //System.out.println("+++DEBUG-completeResults+++ sleeping " + 200); } - return this.result.list(this.result.sizeAvailable()); + return this.result.list(Math.min(query.neededResults(), this.result.sizeAvailable())); } public long postRanking( diff --git a/source/de/anomic/yacy/yacyClient.java b/source/de/anomic/yacy/yacyClient.java index 82f14352e..03aa23af2 100644 --- a/source/de/anomic/yacy/yacyClient.java +++ b/source/de/anomic/yacy/yacyClient.java @@ -401,7 +401,7 @@ public final class yacyClient { final long timestamp = System.currentTimeMillis(); SearchResult result; try { - result = searchClient( + result = new SearchResult( yacyNetwork.basicRequestParts(Switchboard.getSwitchboard(), target.hash, crypt.randomSalt()), mySeed, wordhashes, excludehashes, urlhashes, prefer, filter, language, sitehash, authorhash, count, maxDistance, global, partitions, target.getHexHash() + ".yacyh", target.getClusterAddress(), @@ -411,7 +411,6 @@ public final class yacyClient { //yacyCore.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage()); return -1; } - if (result == null) return -1; // computation time final long totalrequesttime = System.currentTimeMillis() - timestamp; @@ -553,69 +552,6 @@ public final class yacyClient { return result.urlcount; } - public static SearchResult searchClient( - LinkedHashMap parts, - final yacySeed mySeed, - final String wordhashes, - final String excludehashes, - final String urlhashes, - final Pattern prefer, - final Pattern filter, - final String language, - final String sitehash, - final String authorhash, - final int count, - final int maxDistance, - final boolean global, - final int partitions, - String hostname, - String hostaddress, - final SearchEvent.SecondarySearchSuperviser secondarySearchSuperviser, - final RankingProfile rankingProfile, - final Bitfield constraint - ) throws IOException { - // send a search request to peer with remote Hash - - // INPUT: - // iam : complete seed of the requesting peer - // youare : seed hash of the target peer, used for testing network stability - // key : transmission key for response - // search : a list of search words - // hsearch : a string of word hashes - // fwdep : forward depth. if "0" then peer may NOT ask another peer for more results - // fwden : forward deny, a list of seed hashes. They may NOT be target of forward hopping - // count : maximum number of wanted results - // global : if "true", then result may consist of answers from other peers - // partitions : number of remote peers that are asked (for evaluation of QPM) - // duetime : maximum time that a peer should spent to create a result - - // send request - Map resultMap = null; - parts.put("myseed", new StringBody((mySeed == null) ? "" : mySeed.genSeedStr(parts.get("key").toString()))); - parts.put("count", new StringBody(Integer.toString(Math.max(10, count)))); - parts.put("resource", new StringBody(((global) ? "global" : "local"))); - parts.put("partitions", new StringBody(Integer.toString(partitions))); - parts.put("query", new StringBody(wordhashes)); - parts.put("exclude", new StringBody(excludehashes)); - parts.put("duetime", new StringBody("1000")); - parts.put("urls", new StringBody(urlhashes)); - parts.put("prefer", new StringBody(prefer.toString())); - parts.put("filter", new StringBody(filter.toString())); - parts.put("language", new StringBody(language)); - parts.put("sitehash", new StringBody(sitehash)); - parts.put("authorhash", new StringBody(authorhash)); - parts.put("ttl", new StringBody("0")); - parts.put("maxdist", new StringBody(Integer.toString(maxDistance))); - parts.put("profile", new StringBody(crypt.simpleEncode(rankingProfile.toExternalString()))); - parts.put("constraint", new StringBody((constraint == null) ? "" : constraint.exportB64())); - if (secondarySearchSuperviser != null) parts.put("abstracts", new StringBody("auto")); - resultMap = FileUtils.table(HTTPConnector.getConnector(HTTPLoader.crawlerUserAgent).post(new MultiProtocolURI("http://" + hostaddress + "/yacy/search.html"), 60000, hostname, parts)); - //resultMap = FileUtils.table(HTTPConnector.getConnector(HTTPLoader.crawlerUserAgent).post(new MultiProtocolURI("http://" + target.getClusterAddress() + "/yacy/search.html"), 60000, target.getHexHash() + ".yacyh", parts)); - - if (resultMap == null || resultMap.isEmpty()) throw new IOException("resultMap is NULL"); - return new SearchResult(resultMap); - } - public static class SearchResult { public String version; // version : application version of responder @@ -631,11 +567,70 @@ public final class yacyClient { public List links; // LURLs of search public Map indexabstract; // index abstracts, a collection of url-hashes per word - public SearchResult(Map resultMap) throws IOException { + public SearchResult( + LinkedHashMap parts, + final yacySeed mySeed, + final String wordhashes, + final String excludehashes, + final String urlhashes, + final Pattern prefer, + final Pattern filter, + final String language, + final String sitehash, + final String authorhash, + final int count, + final int maxDistance, + final boolean global, + final int partitions, + String hostname, + String hostaddress, + final SearchEvent.SecondarySearchSuperviser secondarySearchSuperviser, + final RankingProfile rankingProfile, + final Bitfield constraint) throws IOException { + // send a search request to peer with remote Hash + + // INPUT: + // iam : complete seed of the requesting peer + // youare : seed hash of the target peer, used for testing network stability + // key : transmission key for response + // search : a list of search words + // hsearch : a string of word hashes + // fwdep : forward depth. if "0" then peer may NOT ask another peer for more results + // fwden : forward deny, a list of seed hashes. They may NOT be target of forward hopping + // count : maximum number of wanted results + // global : if "true", then result may consist of answers from other peers + // partitions : number of remote peers that are asked (for evaluation of QPM) + // duetime : maximum time that a peer should spent to create a result + + // send request + Map resultMap = null; + parts.put("myseed", new StringBody((mySeed == null) ? "" : mySeed.genSeedStr(parts.get("key").toString()))); + parts.put("count", new StringBody(Integer.toString(Math.max(10, count)))); + parts.put("resource", new StringBody(((global) ? "global" : "local"))); + parts.put("partitions", new StringBody(Integer.toString(partitions))); + parts.put("query", new StringBody(wordhashes)); + parts.put("exclude", new StringBody(excludehashes)); + parts.put("duetime", new StringBody("1000")); + parts.put("urls", new StringBody(urlhashes)); + parts.put("prefer", new StringBody(prefer.toString())); + parts.put("filter", new StringBody(filter.toString())); + parts.put("language", new StringBody(language)); + parts.put("sitehash", new StringBody(sitehash)); + parts.put("authorhash", new StringBody(authorhash)); + parts.put("ttl", new StringBody("0")); + parts.put("maxdist", new StringBody(Integer.toString(maxDistance))); + parts.put("profile", new StringBody(crypt.simpleEncode(rankingProfile.toExternalString()))); + parts.put("constraint", new StringBody((constraint == null) ? "" : constraint.exportB64())); + if (secondarySearchSuperviser != null) parts.put("abstracts", new StringBody("auto")); + resultMap = FileUtils.table(HTTPConnector.getConnector(HTTPLoader.crawlerUserAgent).post(new MultiProtocolURI("http://" + hostaddress + "/yacy/search.html"), 60000, hostname, parts)); + //resultMap = FileUtils.table(HTTPConnector.getConnector(HTTPLoader.crawlerUserAgent).post(new MultiProtocolURI("http://" + target.getClusterAddress() + "/yacy/search.html"), 60000, target.getHexHash() + ".yacyh", parts)); + + // evaluate request result + if (resultMap == null || resultMap.isEmpty()) throw new IOException("resultMap is NULL"); try { this.searchtime = Integer.parseInt(resultMap.get("searchtime")); } catch (final NumberFormatException e) { - throw new IOException("wrong output format for searchtime: " + e.getMessage()); + throw new IOException("wrong output format for searchtime: " + e.getMessage() + ", map = " + resultMap.toString()); } try { this.joincount = Integer.parseInt(resultMap.get("joincount")); // the complete number of hits at remote site @@ -1104,7 +1099,7 @@ public final class yacyClient { long time = System.currentTimeMillis(); SearchResult result; try { - result = searchClient( + result = new SearchResult( yacyNetwork.basicRequestParts((String) null, (String) null, "freeworld"), null, // sb.peers.mySeed(), new String(wordhashe), @@ -1124,13 +1119,9 @@ public final class yacyClient { new RankingProfile(ContentDomain.TEXT), // rankingProfile, null // constraint); ); - if (result == null) { - System.out.println("no response"); - } else { - for (URIMetadataRow link: result.links) { + for (URIMetadataRow link: result.links) { System.out.println(link.metadata().url().toNormalform(true, false)); System.out.println(link.snippet()); - } } } catch (IOException e) { // TODO Auto-generated catch block diff --git a/source/net/yacy/kelondro/data/word/WordReferenceVars.java b/source/net/yacy/kelondro/data/word/WordReferenceVars.java index 65afe8bb1..207cf0a96 100644 --- a/source/net/yacy/kelondro/data/word/WordReferenceVars.java +++ b/source/net/yacy/kelondro/data/word/WordReferenceVars.java @@ -397,30 +397,30 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc */ public static BlockingQueue transform(ReferenceContainer container) { - LinkedBlockingQueue out = new LinkedBlockingQueue(); + LinkedBlockingQueue vars = new LinkedBlockingQueue(); if (container.size() <= 100) { // transform without concurrency to omit thread creation overhead for (Row.Entry entry: container) try { - out.put(new WordReferenceVars(new WordReferenceRow(entry))); + vars.put(new WordReferenceVars(new WordReferenceRow(entry))); } catch (InterruptedException e) {} try { - out.put(WordReferenceVars.poison); + vars.put(WordReferenceVars.poison); } catch (InterruptedException e) {} - return out; + return vars; } - Thread distributor = new TransformDistributor(container, out); + Thread distributor = new TransformDistributor(container, vars); distributor.start(); // return the resulting queue while the processing queues are still working - return out; + return vars; } public static class TransformDistributor extends Thread { ReferenceContainer container; - LinkedBlockingQueue out; + BlockingQueue out; - public TransformDistributor(ReferenceContainer container, LinkedBlockingQueue out) { + public TransformDistributor(ReferenceContainer container, BlockingQueue out) { this.container = container; this.out = out; }