- added another enumeration method in kelondro data structure to get a

more random access to data for the balancer
- added random access inside the balancer
pull/1/head
Michael Peter Christen 12 years ago
parent 4eab3aae60
commit 5e182a566f

@ -43,7 +43,6 @@ import net.yacy.cora.document.ASCII;
import net.yacy.cora.document.UTF8;
import net.yacy.cora.federate.yacy.CacheStrategy;
import net.yacy.cora.order.Base64Order;
import net.yacy.cora.order.CloneableIterator;
import net.yacy.cora.protocol.Domains;
import net.yacy.cora.sorting.OrderedScoreMap;
import net.yacy.cora.storage.HandleSet;
@ -481,6 +480,7 @@ public class Balancer {
rest = rest + 1000 * loops;
loops = 0;
}
Thread.currentThread().setName("Balancer waiting for " +crawlEntry.url().getHost() + ": " + sleeptime + " milliseconds");
synchronized(this) {
// must be synchronized here to avoid 'takeover' moves from other threads which then idle the same time which would not be enough
if (rest > 0) {try {this.wait(rest);} catch (final InterruptedException e) {}}
@ -618,20 +618,16 @@ public class Balancer {
this.lastDomainStackFill = System.currentTimeMillis();
//final HandleSet handles = this.urlFileIndex.keysFromBuffer(objectIndexBufferSize / 2);
//final CloneableIterator<byte[]> i = handles.keys(true, null);
final CloneableIterator<byte[]> i = this.urlFileIndex.keys(true, null);
byte[] handle;
String host;
Request request;
int count = 0;
long timeout = System.currentTimeMillis() + 5000;
while (i.hasNext()) {
handle = i.next();
final Row.Entry entry = this.urlFileIndex.get(handle, false);
for (Row.Entry entry: this.urlFileIndex.random(10000)) {
if (entry == null) continue;
request = new Request(entry);
host = request.url().getHost();
try {
pushHashToDomainStacks(host, request.url().hosthash(), handle);
pushHashToDomainStacks(host, request.url().hosthash(), entry.getPrimaryKeyBytes());
} catch (final SpaceExceededException e) {
break;
}

@ -230,6 +230,18 @@ public class BufferedObjectIndex implements Index, Iterable<Row.Entry> {
return list;
}
@Override
public List<Row.Entry> random(final int count) throws IOException {
final List<Row.Entry> list = new ArrayList<Row.Entry>();
synchronized (this.backend) {
List<Row.Entry> list0 = this.buffer.random(count);
list.addAll(list0);
list0 = this.backend.random(count - list.size());
list.addAll(list0);
}
return list;
}
@Override
public Entry removeOne() throws IOException {
synchronized (this.backend) {

@ -585,6 +585,11 @@ public final class Cache implements Index, Iterable<Row.Entry> {
return this.index.top(count);
}
@Override
public synchronized List<Row.Entry> random(final int count) throws IOException {
return this.index.random(count);
}
@Override
public final synchronized Row row() {
return this.index.row();

@ -62,6 +62,7 @@ public interface Index extends Iterable<Row.Entry> {
public Row.Entry remove(byte[] key) throws IOException;
public Row.Entry removeOne() throws IOException;
public List<Row.Entry> top(int count) throws IOException;
public List<Row.Entry> random(int count) throws IOException;
public CloneableIterator<byte[]> keys(boolean up, byte[] firstKey) throws IOException; // iterates only the key
public CloneableIterator<Row.Entry> rows(boolean up, byte[] firstKey) throws IOException; // iterates the whole row using the order of the keys
public CloneableIterator<Row.Entry> rows() throws IOException; // iterates the whole row without any order

@ -302,6 +302,16 @@ public final class RAMIndex implements Index, Iterable<Row.Entry> {
return list;
}
@Override
public synchronized List<Row.Entry> random(final int count) throws IOException {
final List<Row.Entry> list = new ArrayList<Row.Entry>();
List<Row.Entry> list0 = this.index1.random(count);
list.addAll(list0);
list0 = this.index0.random(count - list.size());
list.addAll(list0);
return list;
}
@Override
public long mem() {
if (this.index0 != null && this.index1 == null) {

@ -283,6 +283,25 @@ public final class RAMIndexCluster implements Index, Iterable<Row.Entry>, Clonea
return list;
}
@Override
public List<Row.Entry> random(final int count) {
final List<Row.Entry> list = new ArrayList<Row.Entry>();
synchronized (this.cluster) {
for (final RAMIndex element : this.cluster) {
if (element != null) {
try {
final List<Row.Entry> list0 = element.random(count - list.size());
list.addAll(list0);
} catch (final IOException e) {
continue;
}
}
if (list.size() >= count) return list;
}
}
return list;
}
@Override
public final Entry replace(final Entry row) throws SpaceExceededException {
final int i = indexFor(row);

@ -504,8 +504,9 @@ public class RowCollection implements Sortable<Row.Entry>, Iterable<Row.Entry>,
}
public synchronized List<Row.Entry> top(int count) {
if (count > this.chunkcount) count = this.chunkcount;
final ArrayList<Row.Entry> list = new ArrayList<Row.Entry>();
if (this.chunkcount == 0) return list;
if (this.chunkcount == 0 || count == 0) return list;
Row.Entry entry;
int cursor = this.chunkcount - 1;
while (count > 0 && cursor >= 0) {
@ -517,6 +518,22 @@ public class RowCollection implements Sortable<Row.Entry>, Iterable<Row.Entry>,
return list;
}
public synchronized List<Row.Entry> random(int count) {
if (count > this.chunkcount) count = this.chunkcount;
final ArrayList<Row.Entry> list = new ArrayList<Row.Entry>();
if (this.chunkcount == 0 || count == 0) return list;
Row.Entry entry;
int cursor = 0;
int stepsize = this.chunkcount / count;
while (count > 0 && cursor < this.chunkcount) {
entry = get(cursor, true);
list.add(entry);
count--;
cursor += stepsize;
}
return list;
}
public synchronized byte[] smallestKey() {
if (this.chunkcount == 0) return null;
sort();

@ -311,6 +311,11 @@ public class SQLTable implements Index, Iterable<Row.Entry> {
return null;
}
@Override
public List<Row.Entry> random(final int count) throws IOException {
return null;
}
@Override
public CloneableIterator<Row.Entry> rows(final boolean up, final byte[] startKey) throws IOException {
// Objects are of type kelondroRow.Entry

@ -507,6 +507,26 @@ public class SplitTable implements Index, Iterable<Row.Entry> {
}
}
@Override
public List<Row.Entry> random(final int count) throws IOException {
final Iterator<Index> i = this.tables.values().iterator();
Index table, maxtable = null;
int maxcount = -1;
while (i.hasNext()) {
table = i.next();
if (table.size() > maxcount) {
maxtable = table;
maxcount = table.size();
}
}
if (maxtable == null) {
return null;
}
synchronized (this) { // avoid concurrent IO from different methods
return maxtable.random(count);
}
}
@Override
public CloneableIterator<byte[]> keys(final boolean up, final byte[] firstKey) throws IOException {
final List<CloneableIterator<byte[]>> c = new ArrayList<CloneableIterator<byte[]>>(this.tables.size());

@ -833,8 +833,9 @@ public class Table implements Index, Iterable<Row.Entry> {
@Override
public List<Row.Entry> top(int count) throws IOException {
if (count > this.size()) count = this.size();
final ArrayList<Row.Entry> list = new ArrayList<Row.Entry>();
if ((this.file == null) || (this.index == null)) return list;
if (this.file == null || this.index == null || this.size() == 0 || count == 0) return list;
long i = this.file.size() - 1;
while (count > 0 && i >= 0) {
final byte[] b = new byte[this.rowdef.objectsize];
@ -846,6 +847,23 @@ public class Table implements Index, Iterable<Row.Entry> {
return list;
}
@Override
public List<Row.Entry> random(int count) throws IOException {
if (count > this.size()) count = this.size();
final ArrayList<Row.Entry> list = new ArrayList<Row.Entry>();
if (this.file == null || this.index == null || this.size() == 0 || count == 0) return list;
long cursor = 0;
int stepsize = this.size() / count;
while (count > 0 && cursor < this.size()) {
final byte[] b = new byte[this.rowdef.objectsize];
this.file.get(cursor, b, 0);
list.add(this.rowdef.newEntry(b));
count--;
cursor += stepsize;
}
return list;
}
@Override
public synchronized void clear() throws IOException {
final File f = this.file.filename();

Loading…
Cancel
Save