refactoring and enhanced concurrency

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@7155 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 15 years ago
parent 83ac07874f
commit 0cf006865e

@ -64,9 +64,10 @@ public class ReferenceOrder {
} }
public BlockingQueue<WordReferenceVars> normalizeWith(final ReferenceContainer<WordReference> container) { public BlockingQueue<WordReferenceVars> normalizeWith(final ReferenceContainer<WordReference> container) {
BlockingQueue<WordReferenceVars> vars = WordReferenceVars.transform(container);
LinkedBlockingQueue<WordReferenceVars> out = new LinkedBlockingQueue<WordReferenceVars>(); LinkedBlockingQueue<WordReferenceVars> out = new LinkedBlockingQueue<WordReferenceVars>();
Thread distributor = new NormalizeDistributor(vars, out); int threads = cores + 1;
if (container.size() < 20) threads = 2;
Thread distributor = new NormalizeDistributor(container, out, threads);
distributor.start(); distributor.start();
// return the resulting queue while the processing queues are still working // return the resulting queue while the processing queues are still working
@ -75,21 +76,25 @@ public class ReferenceOrder {
public class NormalizeDistributor extends Thread { public class NormalizeDistributor extends Thread {
BlockingQueue<WordReferenceVars> vars; ReferenceContainer<WordReference> container;
LinkedBlockingQueue<WordReferenceVars> out; LinkedBlockingQueue<WordReferenceVars> out;
private int threads;
public NormalizeDistributor(BlockingQueue<WordReferenceVars> vars, LinkedBlockingQueue<WordReferenceVars> out) { public NormalizeDistributor(ReferenceContainer<WordReference> container, LinkedBlockingQueue<WordReferenceVars> out, int threads) {
this.vars = vars; this.container = container;
this.out = out; this.out = out;
this.threads = threads;
} }
@Override @Override
public void run() { public void run() {
// transform the reference container into a stream of parsed entries
BlockingQueue<WordReferenceVars> vars = WordReferenceVars.transform(container);
// start the transformation threads // start the transformation threads
int cores0 = cores + 1; Semaphore termination = new Semaphore(this.threads);
Semaphore termination = new Semaphore(cores0); NormalizeWorker[] worker = new NormalizeWorker[this.threads];
NormalizeWorker[] worker = new NormalizeWorker[cores0]; for (int i = 0; i < this.threads; i++) {
for (int i = 0; i < cores0; i++) {
worker[i] = new NormalizeWorker(out, termination); worker[i] = new NormalizeWorker(out, termination);
worker[i].start(); worker[i].start();
} }
@ -99,17 +104,20 @@ public class ReferenceOrder {
int p = 0; int p = 0;
try { try {
while ((iEntry = vars.take()) != WordReferenceVars.poison) { while ((iEntry = vars.take()) != WordReferenceVars.poison) {
worker[p % cores0].add(iEntry); worker[p % this.threads].add(iEntry);
p++; p++;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }
// insert poison to stop the queues // insert poison to stop the queues
for (int i = 0; i < cores0; i++) worker[i].add(WordReferenceVars.poison); for (int i = 0; i < this.threads; i++) worker[i].add(WordReferenceVars.poison);
} }
} }
/**
* normalize ranking: find minimum and maximum of separate ranking criteria
*/
public class NormalizeWorker extends Thread { public class NormalizeWorker extends Thread {
private final BlockingQueue<WordReferenceVars> out; private final BlockingQueue<WordReferenceVars> out;
@ -131,28 +139,6 @@ public class ReferenceOrder {
public void run() { public void run() {
try { try {
addNormalizer(decodedEntries, out);
} catch (InterruptedException e) {
Log.logException(e);
} catch (Exception e) {
Log.logException(e);
} finally {
// insert poison to signal the termination to next queue
try {
this.termination.acquire();
if (this.termination.availablePermits() == 0) this.out.put(WordReferenceVars.poison);
} catch (InterruptedException e) {}
}
}
}
/**
* normalize ranking: find minimum and maximum of separate ranking criteria
* @param decodedEntries
* @param out
* @throws InterruptedException
*/
public void addNormalizer(BlockingQueue<WordReferenceVars> decodedEntries, final BlockingQueue<WordReferenceVars> out) throws InterruptedException {
WordReferenceVars iEntry; WordReferenceVars iEntry;
Map<String, Integer> doms0 = new HashMap<String, Integer>(); Map<String, Integer> doms0 = new HashMap<String, Integer>();
String dom; String dom;
@ -180,21 +166,19 @@ public class ReferenceOrder {
entry = di.next(); entry = di.next();
doms.addScore(entry.getKey(), (entry.getValue()).intValue()); doms.addScore(entry.getKey(), (entry.getValue()).intValue());
} }
if (!doms.isEmpty()) this.maxdomcount = doms.getMaxScore(); if (!doms.isEmpty()) maxdomcount = doms.getMaxScore();
} catch (InterruptedException e) {
Log.logException(e);
} catch (Exception e) {
Log.logException(e);
} finally {
// insert poison to signal the termination to next queue
try {
this.termination.acquire();
if (this.termination.availablePermits() == 0) this.out.put(WordReferenceVars.poison);
} catch (InterruptedException e) {}
}
} }
public void addNormalizer(WordReferenceVars iEntry, final BlockingQueue<WordReferenceVars> out) throws InterruptedException {
out.put(iEntry);
// find min/max
if (min == null) min = iEntry.clone(); else min.min(iEntry);
if (max == null) max = iEntry.clone(); else max.max(iEntry);
// update domcount
String dom = new String(iEntry.metadataHash(), 6, 6);
doms.incScore(dom);
if (!doms.isEmpty()) this.maxdomcount = doms.getMaxScore();
} }
public int authority(final byte[] urlHash) { public int authority(final byte[] urlHash) {

@ -357,10 +357,10 @@ public class ResultFetcher {
public ArrayList<ReverseElement<ResultEntry>> completeResults(final long waitingtime) { public ArrayList<ReverseElement<ResultEntry>> completeResults(final long waitingtime) {
final long timeout = System.currentTimeMillis() + waitingtime; final long timeout = System.currentTimeMillis() + waitingtime;
while ((result.sizeAvailable() < query.neededResults()) && (anyWorkerAlive()) && (System.currentTimeMillis() < timeout)) { while ((result.sizeAvailable() < query.neededResults()) && (anyWorkerAlive()) && (System.currentTimeMillis() < timeout)) {
try {Thread.sleep(100);} catch (final InterruptedException e) {} try {Thread.sleep(20);} catch (final InterruptedException e) {}
//System.out.println("+++DEBUG-completeResults+++ sleeping " + 200); //System.out.println("+++DEBUG-completeResults+++ sleeping " + 200);
} }
return this.result.list(this.result.sizeAvailable()); return this.result.list(Math.min(query.neededResults(), this.result.sizeAvailable()));
} }
public long postRanking( public long postRanking(

@ -401,7 +401,7 @@ public final class yacyClient {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
SearchResult result; SearchResult result;
try { try {
result = searchClient( result = new SearchResult(
yacyNetwork.basicRequestParts(Switchboard.getSwitchboard(), target.hash, crypt.randomSalt()), yacyNetwork.basicRequestParts(Switchboard.getSwitchboard(), target.hash, crypt.randomSalt()),
mySeed, wordhashes, excludehashes, urlhashes, prefer, filter, language, mySeed, wordhashes, excludehashes, urlhashes, prefer, filter, language,
sitehash, authorhash, count, maxDistance, global, partitions, target.getHexHash() + ".yacyh", target.getClusterAddress(), sitehash, authorhash, count, maxDistance, global, partitions, target.getHexHash() + ".yacyh", target.getClusterAddress(),
@ -411,7 +411,6 @@ public final class yacyClient {
//yacyCore.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage()); //yacyCore.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage());
return -1; return -1;
} }
if (result == null) return -1;
// computation time // computation time
final long totalrequesttime = System.currentTimeMillis() - timestamp; final long totalrequesttime = System.currentTimeMillis() - timestamp;
@ -553,7 +552,22 @@ public final class yacyClient {
return result.urlcount; return result.urlcount;
} }
public static SearchResult searchClient( public static class SearchResult {
public String version; // version : application version of responder
public String uptime; // uptime : uptime in seconds of responder
public String fwhop; // hops (depth) of forwards that had been performed to construct this result
public String fwsrc; // peers that helped to construct this result
public String fwrec; // peers that would have helped to construct this result (recommendations)
public int urlcount; // number of returned LURL's for this search
public int joincount; //
public Map<byte[], Integer> indexcount; //
public long searchtime; // time that the peer actually spent to create the result
public String[] references; // search hints, the top-words
public List<URIMetadataRow> links; // LURLs of search
public Map<byte[], String> indexabstract; // index abstracts, a collection of url-hashes per word
public SearchResult(
LinkedHashMap<String,ContentBody> parts, LinkedHashMap<String,ContentBody> parts,
final yacySeed mySeed, final yacySeed mySeed,
final String wordhashes, final String wordhashes,
@ -572,8 +586,7 @@ public final class yacyClient {
String hostaddress, String hostaddress,
final SearchEvent.SecondarySearchSuperviser secondarySearchSuperviser, final SearchEvent.SecondarySearchSuperviser secondarySearchSuperviser,
final RankingProfile rankingProfile, final RankingProfile rankingProfile,
final Bitfield constraint final Bitfield constraint) throws IOException {
) throws IOException {
// send a search request to peer with remote Hash // send a search request to peer with remote Hash
// INPUT: // INPUT:
@ -612,30 +625,12 @@ public final class yacyClient {
resultMap = FileUtils.table(HTTPConnector.getConnector(HTTPLoader.crawlerUserAgent).post(new MultiProtocolURI("http://" + hostaddress + "/yacy/search.html"), 60000, hostname, parts)); resultMap = FileUtils.table(HTTPConnector.getConnector(HTTPLoader.crawlerUserAgent).post(new MultiProtocolURI("http://" + hostaddress + "/yacy/search.html"), 60000, hostname, parts));
//resultMap = FileUtils.table(HTTPConnector.getConnector(HTTPLoader.crawlerUserAgent).post(new MultiProtocolURI("http://" + target.getClusterAddress() + "/yacy/search.html"), 60000, target.getHexHash() + ".yacyh", parts)); //resultMap = FileUtils.table(HTTPConnector.getConnector(HTTPLoader.crawlerUserAgent).post(new MultiProtocolURI("http://" + target.getClusterAddress() + "/yacy/search.html"), 60000, target.getHexHash() + ".yacyh", parts));
// evaluate request result
if (resultMap == null || resultMap.isEmpty()) throw new IOException("resultMap is NULL"); if (resultMap == null || resultMap.isEmpty()) throw new IOException("resultMap is NULL");
return new SearchResult(resultMap);
}
public static class SearchResult {
public String version; // version : application version of responder
public String uptime; // uptime : uptime in seconds of responder
public String fwhop; // hops (depth) of forwards that had been performed to construct this result
public String fwsrc; // peers that helped to construct this result
public String fwrec; // peers that would have helped to construct this result (recommendations)
public int urlcount; // number of returned LURL's for this search
public int joincount; //
public Map<byte[], Integer> indexcount; //
public long searchtime; // time that the peer actually spent to create the result
public String[] references; // search hints, the top-words
public List<URIMetadataRow> links; // LURLs of search
public Map<byte[], String> indexabstract; // index abstracts, a collection of url-hashes per word
public SearchResult(Map<String, String> resultMap) throws IOException {
try { try {
this.searchtime = Integer.parseInt(resultMap.get("searchtime")); this.searchtime = Integer.parseInt(resultMap.get("searchtime"));
} catch (final NumberFormatException e) { } catch (final NumberFormatException e) {
throw new IOException("wrong output format for searchtime: " + e.getMessage()); throw new IOException("wrong output format for searchtime: " + e.getMessage() + ", map = " + resultMap.toString());
} }
try { try {
this.joincount = Integer.parseInt(resultMap.get("joincount")); // the complete number of hits at remote site this.joincount = Integer.parseInt(resultMap.get("joincount")); // the complete number of hits at remote site
@ -1104,7 +1099,7 @@ public final class yacyClient {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
SearchResult result; SearchResult result;
try { try {
result = searchClient( result = new SearchResult(
yacyNetwork.basicRequestParts((String) null, (String) null, "freeworld"), yacyNetwork.basicRequestParts((String) null, (String) null, "freeworld"),
null, // sb.peers.mySeed(), null, // sb.peers.mySeed(),
new String(wordhashe), new String(wordhashe),
@ -1124,14 +1119,10 @@ public final class yacyClient {
new RankingProfile(ContentDomain.TEXT), // rankingProfile, new RankingProfile(ContentDomain.TEXT), // rankingProfile,
null // constraint); null // constraint);
); );
if (result == null) {
System.out.println("no response");
} else {
for (URIMetadataRow link: result.links) { for (URIMetadataRow link: result.links) {
System.out.println(link.metadata().url().toNormalform(true, false)); System.out.println(link.metadata().url().toNormalform(true, false));
System.out.println(link.snippet()); System.out.println(link.snippet());
} }
}
} catch (IOException e) { } catch (IOException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();

@ -397,30 +397,30 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc
*/ */
public static BlockingQueue<WordReferenceVars> transform(ReferenceContainer<WordReference> container) { public static BlockingQueue<WordReferenceVars> transform(ReferenceContainer<WordReference> container) {
LinkedBlockingQueue<WordReferenceVars> out = new LinkedBlockingQueue<WordReferenceVars>(); LinkedBlockingQueue<WordReferenceVars> vars = new LinkedBlockingQueue<WordReferenceVars>();
if (container.size() <= 100) { if (container.size() <= 100) {
// transform without concurrency to omit thread creation overhead // transform without concurrency to omit thread creation overhead
for (Row.Entry entry: container) try { for (Row.Entry entry: container) try {
out.put(new WordReferenceVars(new WordReferenceRow(entry))); vars.put(new WordReferenceVars(new WordReferenceRow(entry)));
} catch (InterruptedException e) {} } catch (InterruptedException e) {}
try { try {
out.put(WordReferenceVars.poison); vars.put(WordReferenceVars.poison);
} catch (InterruptedException e) {} } catch (InterruptedException e) {}
return out; return vars;
} }
Thread distributor = new TransformDistributor(container, out); Thread distributor = new TransformDistributor(container, vars);
distributor.start(); distributor.start();
// return the resulting queue while the processing queues are still working // return the resulting queue while the processing queues are still working
return out; return vars;
} }
public static class TransformDistributor extends Thread { public static class TransformDistributor extends Thread {
ReferenceContainer<WordReference> container; ReferenceContainer<WordReference> container;
LinkedBlockingQueue<WordReferenceVars> out; BlockingQueue<WordReferenceVars> out;
public TransformDistributor(ReferenceContainer<WordReference> container, LinkedBlockingQueue<WordReferenceVars> out) { public TransformDistributor(ReferenceContainer<WordReference> container, BlockingQueue<WordReferenceVars> out) {
this.container = container; this.container = container;
this.out = out; this.out = out;
} }

Loading…
Cancel
Save