|
|
|
@ -39,6 +39,7 @@ import de.anomic.kelondro.text.MetadataRepository;
|
|
|
|
|
import de.anomic.kelondro.text.Segment;
|
|
|
|
|
import de.anomic.kelondro.text.referencePrototype.WordReference;
|
|
|
|
|
import de.anomic.kelondro.text.referencePrototype.WordReferenceRow;
|
|
|
|
|
import de.anomic.kelondro.util.ByteArray;
|
|
|
|
|
import de.anomic.server.serverProcessor;
|
|
|
|
|
import de.anomic.yacy.yacySeed;
|
|
|
|
|
import de.anomic.yacy.yacySeedDB;
|
|
|
|
@ -80,7 +81,7 @@ public class Dispatcher {
|
|
|
|
|
|
|
|
|
|
// a cloud is a cache for the objects that wait to be transmitted
|
|
|
|
|
// the String-key is the primary target as contained in the Entry
|
|
|
|
|
private Map<byte[], Transmission.Chunk> transmissionCloud;
|
|
|
|
|
private Map<ByteArray, Transmission.Chunk> transmissionCloud;
|
|
|
|
|
|
|
|
|
|
// the backend is used to store the remaining indexContainers in case that the object is closed
|
|
|
|
|
private BufferedIndex<WordReference> backend;
|
|
|
|
@ -104,7 +105,7 @@ public class Dispatcher {
|
|
|
|
|
final boolean gzipBody,
|
|
|
|
|
final int timeout
|
|
|
|
|
) {
|
|
|
|
|
this.transmissionCloud = new LinkedHashMap<byte[], Transmission.Chunk>();
|
|
|
|
|
this.transmissionCloud = new LinkedHashMap<ByteArray, Transmission.Chunk>();
|
|
|
|
|
this.backend = backend;
|
|
|
|
|
this.seeds = seeds;
|
|
|
|
|
this.log = new Log("INDEX-TRANSFER-DISPATCHER");
|
|
|
|
@ -275,15 +276,17 @@ public class Dispatcher {
|
|
|
|
|
if (transmissionCloud == null) return;
|
|
|
|
|
ReferenceContainer<WordReference> lastContainer;
|
|
|
|
|
byte[] primaryTarget;
|
|
|
|
|
ByteArray pTArray;
|
|
|
|
|
Transmission.Chunk entry;
|
|
|
|
|
for (int vertical = 0; vertical < containers.length; vertical++) {
|
|
|
|
|
// the 'new' primary target is the word hash of the last container
|
|
|
|
|
lastContainer = containers[vertical].get(containers[vertical].size() - 1);
|
|
|
|
|
primaryTarget = FlatWordPartitionScheme.positionToHash(this.seeds.scheme.dhtPosition(lastContainer.getTermHash(), vertical));
|
|
|
|
|
assert primaryTarget[2] != '@';
|
|
|
|
|
pTArray = new ByteArray(primaryTarget);
|
|
|
|
|
|
|
|
|
|
// get or make a entry object
|
|
|
|
|
entry = this.transmissionCloud.get(primaryTarget); // if this is not null, the entry is extended here
|
|
|
|
|
entry = this.transmissionCloud.get(pTArray); // if this is not null, the entry is extended here
|
|
|
|
|
ArrayList<yacySeed> targets = PeerSelection.getAcceptRemoteIndexSeedsList(
|
|
|
|
|
seeds,
|
|
|
|
|
primaryTarget,
|
|
|
|
@ -306,7 +309,7 @@ public class Dispatcher {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// put the entry into the cloud
|
|
|
|
|
if (entry.containersSize() > 0) this.transmissionCloud.put(primaryTarget, entry);
|
|
|
|
|
if (entry.containersSize() > 0) this.transmissionCloud.put(pTArray, entry);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -352,9 +355,9 @@ public class Dispatcher {
|
|
|
|
|
public boolean dequeueContainer() {
|
|
|
|
|
if (transmissionCloud == null) return false;
|
|
|
|
|
if (this.indexingTransmissionProcessor.queueSize() > indexingTransmissionProcessor.concurrency()) return false;
|
|
|
|
|
byte[] maxtarget = null;
|
|
|
|
|
ByteArray maxtarget = null;
|
|
|
|
|
int maxsize = -1;
|
|
|
|
|
for (Map.Entry<byte[], Transmission.Chunk> chunk: this.transmissionCloud.entrySet()) {
|
|
|
|
|
for (Map.Entry<ByteArray, Transmission.Chunk> chunk: this.transmissionCloud.entrySet()) {
|
|
|
|
|
if (chunk.getValue().containersSize() > maxsize) {
|
|
|
|
|
maxsize = chunk.getValue().containersSize();
|
|
|
|
|
maxtarget = chunk.getKey();
|
|
|
|
@ -401,7 +404,7 @@ public class Dispatcher {
|
|
|
|
|
// removes all entries from the dispatcher and puts them back to a RAMRI
|
|
|
|
|
if (indexingTransmissionProcessor != null) this.indexingTransmissionProcessor.announceShutdown();
|
|
|
|
|
if (this.transmissionCloud != null) {
|
|
|
|
|
for (Map.Entry<byte[], Transmission.Chunk> e : this.transmissionCloud.entrySet()) {
|
|
|
|
|
for (Map.Entry<ByteArray, Transmission.Chunk> e : this.transmissionCloud.entrySet()) {
|
|
|
|
|
for (ReferenceContainer<WordReference> i : e.getValue()) try {this.backend.add(i);} catch (IOException e1) {}
|
|
|
|
|
}
|
|
|
|
|
this.transmissionCloud.clear();
|
|
|
|
|