making sure that crawl queues are closed correctly to prevent data loss

pull/436/head
Michael Peter Christen 3 years ago
parent 90507c0fdc
commit d19872fd26

@ -155,6 +155,7 @@ public class HostBalancer implements Balancer {
@Override
public synchronized void close() {
log.info("closing all HostBalancer queues (" + this.queues.size() + ") for hostPath " + this.hostsPath);
if (depthCache != null) {
depthCache.clear();
}

@ -288,6 +288,7 @@ public class HostQueue implements Balancer {
@Override
public synchronized void close() {
log.info("closing HostQueue, closing " + this.depthStacks.size() + " depthStacks for host " + this.hostName);
for (final Map.Entry<Integer, Index> entry: this.depthStacks.entrySet()) {
final int size = entry.getValue().size();
entry.getValue().close();

@ -61,7 +61,7 @@ public class BufferedObjectIndex implements Index, Iterable<Row.Entry> {
public boolean isOnDemand() {
return this.backend instanceof OnDemandOpenFileIndex;
}
@Override
public byte[] smallestKey() {
if (this.buffer == null || this.buffer.isEmpty()) return this.backend.smallestKey();
@ -78,8 +78,12 @@ public class BufferedObjectIndex implements Index, Iterable<Row.Entry> {
private final void flushBuffer() throws IOException, SpaceExceededException {
if (!this.buffer.isEmpty()) {
for (final Row.Entry e: this.buffer) {
this.backend.put(e);
if (this.backend instanceof OnDemandOpenFileIndex) {
((OnDemandOpenFileIndex) this.backend).put(this.buffer);
} else {
for (final Row.Entry e: this.buffer) {
this.backend.put(e);
}
}
this.buffer.clear();
}
@ -90,7 +94,7 @@ public class BufferedObjectIndex implements Index, Iterable<Row.Entry> {
this.backend.optimize();
this.buffer.optimize();
}
@Override
public long mem() {
return this.backend.mem() + this.buffer.mem();

@ -7,12 +7,12 @@
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file lgpl21.txt
* If not, see <http://www.gnu.org/licenses/>.
@ -58,7 +58,7 @@ public class OnDemandOpenFileIndex implements Index, Iterable<Row.Entry> {
private Index getIndex() {
try {
return new Table(file, rowdef, 1000, 0, false, exceed134217727, false);
return new Table(this.file, this.rowdef, 1000, 0, false, this.exceed134217727, false);
} catch (kelondroException e) {
ConcurrentLog.logException(e);
return null;
@ -67,7 +67,7 @@ public class OnDemandOpenFileIndex implements Index, Iterable<Row.Entry> {
return null;
}
}
@Override
public synchronized byte[] smallestKey() {
Index index = getIndex();
@ -93,7 +93,7 @@ public class OnDemandOpenFileIndex implements Index, Iterable<Row.Entry> {
index.optimize();
index.close();
}
@Override
public synchronized long mem() {
Index index = getIndex();
@ -133,6 +133,7 @@ public class OnDemandOpenFileIndex implements Index, Iterable<Row.Entry> {
@Override
public synchronized void close() {
// there is actually nothing to do here because this class does not hold any data
}
@Override
@ -149,7 +150,7 @@ public class OnDemandOpenFileIndex implements Index, Iterable<Row.Entry> {
@Override
public synchronized int size() {
if (sizecache >= 0) return sizecache;
if (this.sizecache >= 0) return this.sizecache;
Index index = getIndex();
if (index == null) return 0;
int i = index.size();
@ -227,6 +228,31 @@ public class OnDemandOpenFileIndex implements Index, Iterable<Row.Entry> {
}
}
/**
* Mass-put method in case that a larger amount of rows should be stored.
* This is the case in the BufferedObjectIndex class where a write buffer is flushed at once.
* Without a mass-backend to store the data, the put would be called many times where each time the file is opened and closed.
* This should speed-up the process.
* @param rowset
* @throws IOException
* @throws SpaceExceededException
*/
public synchronized void put(final RowSet rowset) throws IOException, SpaceExceededException {
Index index = getIndex();
if (index == null) return;
try {
for (final Row.Entry row: rowset) {
boolean b = index.put(row);
if (this.sizecache >= 0 && b) this.sizecache++;
}
return;
} catch (IOException e) {
throw e;
} finally {
index.close();
}
}
@Override
public synchronized Entry remove(final byte[] key) throws IOException {
Index index = getIndex();

Loading…
Cancel
Save