- added a new retry connector for solr (for cases where solr responses are slow)

- added a new exist property into the metadataRepository which includes solr entries

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@8016 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 13 years ago
parent 62e674af50
commit e58438c01c

@ -88,7 +88,7 @@ public class IndexFederated_p {
// switch on
final boolean usesolr = sb.getConfigBool("federated.service.solr.indexing.enabled", false) & solrurls.length() > 0;
try {
sb.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectSolr((usesolr) ? new SolrChardingConnector(solrurls, scheme, SolrChardingSelection.Method.MODULO_HOST_MD5) : null);
sb.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectSolr((usesolr) ? new SolrChardingConnector(solrurls, scheme, SolrChardingSelection.Method.MODULO_HOST_MD5, 10000) : null);
} catch (final IOException e) {
Log.logException(e);
sb.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectSolr(null);

@ -715,6 +715,10 @@ public class MultiProtocolURI implements Serializable, Comparable<MultiProtocolU
this.ref = null;
}
/**
* the userInfo is the authentication part in front of the host; separated by '@'
* @return a string like '<user>:<password>' or just '<user>'
*/
public String getUserInfo() {
return this.userInfo;
}

@ -42,17 +42,17 @@ import org.apache.solr.common.SolrInputDocument;
public class SolrChardingConnector implements SolrConnector {
private final List<SolrSingleConnector> connectors;
private final List<SolrConnector> connectors;
private final SolrScheme scheme;
private final SolrChardingSelection charding;
private final String[] urls;
public SolrChardingConnector(final String urlList, final SolrScheme scheme, final SolrChardingSelection.Method method) throws IOException {
public SolrChardingConnector(final String urlList, final SolrScheme scheme, final SolrChardingSelection.Method method, final long timeout) throws IOException {
urlList.replace(' ', ',');
this.urls = urlList.split(",");
this.connectors = new ArrayList<SolrSingleConnector>();
this.connectors = new ArrayList<SolrConnector>();
for (final String u: this.urls) {
this.connectors.add(new SolrSingleConnector(u.trim(), scheme));
this.connectors.add(new SolrRetryConnector(new SolrSingleConnector(u.trim(), scheme), timeout));
}
this.charding = new SolrChardingSelection(method, this.urls.length);
this.scheme = scheme;
@ -63,7 +63,7 @@ public class SolrChardingConnector implements SolrConnector {
}
public void close() {
for (final SolrSingleConnector connector: this.connectors) connector.close();
for (final SolrConnector connector: this.connectors) connector.close();
}
/**
@ -71,7 +71,7 @@ public class SolrChardingConnector implements SolrConnector {
* @throws IOException
*/
public void clear() throws IOException {
for (final SolrSingleConnector connector: this.connectors) connector.clear();
for (final SolrConnector connector: this.connectors) connector.clear();
}
/**
@ -80,7 +80,7 @@ public class SolrChardingConnector implements SolrConnector {
* @throws IOException
*/
public void delete(final String id) throws IOException {
for (final SolrSingleConnector connector: this.connectors) connector.delete(id);
for (final SolrConnector connector: this.connectors) connector.delete(id);
}
/**
@ -89,7 +89,20 @@ public class SolrChardingConnector implements SolrConnector {
* @throws IOException
*/
public void delete(final List<String> ids) throws IOException {
for (final SolrSingleConnector connector: this.connectors) connector.delete(ids);
for (final SolrConnector connector: this.connectors) connector.delete(ids);
}
/**
* check if a given id exists in solr
* @param id
* @return true if any entry in solr exists
* @throws IOException
*/
public boolean exists(final String id) throws IOException {
for (final SolrConnector connector: this.connectors) {
if (connector.exists(id)) return true;
}
return false;
}
/**
@ -108,7 +121,7 @@ public class SolrChardingConnector implements SolrConnector {
* @param solrdoc
* @throws IOException
*/
private void add(final SolrInputDocument solrdoc) throws IOException {
public void add(final SolrInputDocument solrdoc) throws IOException {
this.connectors.get(this.charding.select(solrdoc)).add(solrdoc);
}
@ -141,7 +154,7 @@ public class SolrChardingConnector implements SolrConnector {
*/
public SolrDocumentList get(final String querystring, final int offset, final int count) throws IOException {
final SolrDocumentList list = new SolrDocumentList();
for (final SolrSingleConnector connector: this.connectors) {
for (final SolrConnector connector: this.connectors) {
final SolrDocumentList l = connector.get(querystring, offset, count);
for (final SolrDocument d: l) {
list.add(d);
@ -153,7 +166,7 @@ public class SolrChardingConnector implements SolrConnector {
public SolrDocumentList[] getList(final String querystring, final int offset, final int count) throws IOException {
final SolrDocumentList[] list = new SolrDocumentList[this.connectors.size()];
int i = 0;
for (final SolrSingleConnector connector: this.connectors) {
for (final SolrConnector connector: this.connectors) {
list[i++] = connector.get(querystring, offset, count);
}
return list;
@ -162,7 +175,7 @@ public class SolrChardingConnector implements SolrConnector {
public long[] getSizeList() {
final long[] size = new long[this.connectors.size()];
int i = 0;
for (final SolrSingleConnector connector: this.connectors) {
for (final SolrConnector connector: this.connectors) {
size[i++] = connector.getSize();
}
return size;
@ -187,4 +200,5 @@ public class SolrChardingConnector implements SolrConnector {
}
return urlAdmin;
}
}

@ -32,6 +32,8 @@ import net.yacy.document.Document;
import net.yacy.kelondro.data.meta.DigestURI;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
public interface SolrConnector {
@ -63,6 +65,14 @@ public interface SolrConnector {
*/
public void delete(final List<String> ids) throws IOException;
/**
* check if a given id exists in solr
* @param id
* @return true if any entry in solr exists
* @throws IOException
*/
public boolean exists(final String id) throws IOException;
/**
* add a YaCy document. This calls the scheme processor to add the document as solr document
* @param id the url hash of the entry
@ -72,6 +82,14 @@ public interface SolrConnector {
*/
public void add(final String id, final ResponseHeader header, final Document doc) throws IOException;
/**
* add a solr input document
* @param solrdoc
* @throws IOException
* @throws SolrException
*/
public void add(final SolrInputDocument solrdoc) throws IOException, SolrException;
/**
* register an entry as error document
* @param digestURI

@ -0,0 +1,189 @@
/**
* SolrRetryConnector
* Copyright 2011 by Michael Peter Christen
* First released 08.11.2011 at http://yacy.net
*
* $LastChangedDate: 2011-04-14 22:05:04 +0200 (Do, 14 Apr 2011) $
* $LastChangedRevision: 7654 $
* $LastChangedBy: orbiter $
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file lgpl21.txt
* If not, see <http://www.gnu.org/licenses/>.
*/
package net.yacy.cora.services.federated.solr;
import java.io.IOException;
import java.util.List;
import net.yacy.cora.protocol.ResponseHeader;
import net.yacy.document.Document;
import net.yacy.kelondro.data.meta.DigestURI;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
public class SolrRetryConnector implements SolrConnector {
private final SolrConnector solrConnector;
private final long retryMaxTime;
public SolrRetryConnector(final SolrConnector solrConnector, final long retryMaxTime) {
this.solrConnector = solrConnector;
this.retryMaxTime = retryMaxTime;
}
@Override
public SolrScheme getScheme() {
return this.solrConnector.getScheme();
}
@Override
public void close() {
this.solrConnector.close();
}
@Override
public void clear() throws IOException {
final long t = System.currentTimeMillis() + this.retryMaxTime;
Throwable ee = null;
while (System.currentTimeMillis() < t) try {
this.solrConnector.clear();
return;
} catch (final Throwable e) {
ee = e;
try {Thread.sleep(10);} catch (final InterruptedException e1) {}
continue;
}
if (ee != null) throw (ee instanceof IOException) ? (IOException) ee : new IOException(ee.getMessage());
}
@Override
public void delete(final String id) throws IOException {
final long t = System.currentTimeMillis() + this.retryMaxTime;
Throwable ee = null;
while (System.currentTimeMillis() < t) try {
this.solrConnector.delete(id);
return;
} catch (final Throwable e) {
ee = e;
try {Thread.sleep(10);} catch (final InterruptedException e1) {}
continue;
}
if (ee != null) throw (ee instanceof IOException) ? (IOException) ee : new IOException(ee.getMessage());
}
@Override
public void delete(final List<String> ids) throws IOException {
final long t = System.currentTimeMillis() + this.retryMaxTime;
Throwable ee = null;
while (System.currentTimeMillis() < t) try {
this.solrConnector.delete(ids);
return;
} catch (final Throwable e) {
ee = e;
try {Thread.sleep(10);} catch (final InterruptedException e1) {}
continue;
}
if (ee != null) throw (ee instanceof IOException) ? (IOException) ee : new IOException(ee.getMessage());
}
@Override
public boolean exists(final String id) throws IOException {
final long t = System.currentTimeMillis() + this.retryMaxTime;
Throwable ee = null;
while (System.currentTimeMillis() < t) try {
return this.solrConnector.exists(id);
} catch (final Throwable e) {
ee = e;
try {Thread.sleep(10);} catch (final InterruptedException e1) {}
continue;
}
if (ee != null) throw (ee instanceof IOException) ? (IOException) ee : new IOException(ee.getMessage());
return false;
}
@Override
public void add(final String id, final ResponseHeader header, final Document doc) throws IOException {
final long t = System.currentTimeMillis() + this.retryMaxTime;
Throwable ee = null;
while (System.currentTimeMillis() < t) try {
this.solrConnector.add(id, header, doc);
return;
} catch (final Throwable e) {
ee = e;
try {Thread.sleep(10);} catch (final InterruptedException e1) {}
continue;
}
if (ee != null) throw (ee instanceof IOException) ? (IOException) ee : new IOException(ee.getMessage());
}
@Override
public void add(final SolrInputDocument solrdoc) throws IOException, SolrException {
final long t = System.currentTimeMillis() + this.retryMaxTime;
Throwable ee = null;
while (System.currentTimeMillis() < t) try {
this.solrConnector.add(solrdoc);
return;
} catch (final Throwable e) {
ee = e;
try {Thread.sleep(10);} catch (final InterruptedException e1) {}
continue;
}
if (ee != null) throw (ee instanceof IOException) ? (IOException) ee : new IOException(ee.getMessage());
}
@Override
public void err(final DigestURI digestURI, final String failReason, final int httpstatus) throws IOException {
final long t = System.currentTimeMillis() + this.retryMaxTime;
Throwable ee = null;
while (System.currentTimeMillis() < t) try {
this.solrConnector.err(digestURI, failReason, httpstatus);
return;
} catch (final Throwable e) {
ee = e;
try {Thread.sleep(10);} catch (final InterruptedException e1) {}
continue;
}
if (ee != null) throw (ee instanceof IOException) ? (IOException) ee : new IOException(ee.getMessage());
}
@Override
public SolrDocumentList get(final String querystring, final int offset, final int count) throws IOException {
final long t = System.currentTimeMillis() + this.retryMaxTime;
Throwable ee = null;
while (System.currentTimeMillis() < t) try {
return this.solrConnector.get(querystring, offset, count);
} catch (final Throwable e) {
ee = e;
try {Thread.sleep(10);} catch (final InterruptedException e1) {}
continue;
}
if (ee != null) throw (ee instanceof IOException) ? (IOException) ee : new IOException(ee.getMessage());
return null;
}
@Override
public long getSize() {
final long t = System.currentTimeMillis() + this.retryMaxTime;
while (System.currentTimeMillis() < t) try {
return this.solrConnector.getSize();
} catch (final Throwable e) {
continue;
}
return 0;
}
}

@ -47,7 +47,6 @@ import org.apache.commons.httpclient.UsernamePasswordCredentials;
import org.apache.commons.httpclient.auth.AuthPolicy;
import org.apache.commons.httpclient.auth.AuthScope;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
@ -69,6 +68,12 @@ public class SolrSingleConnector implements SolrConnector {
private final BlockingQueue<SolrInputDocument>[] transmissionQueue; // the queues quere documents are collected
private int transmissionRoundRobinCounter; // a rount robin counter for the transmission queues
/**
* create a new solr connector
* @param url the solr url, like http://192.168.1.60:8983/solr/ or http://admin:pw@192.168.1.60:8983/solr/
* @param scheme
* @throws IOException
*/
@SuppressWarnings("unchecked")
public SolrSingleConnector(final String url, final SolrScheme scheme) throws IOException {
this.solrurl = url;
@ -189,7 +194,7 @@ public class SolrSingleConnector implements SolrConnector {
try {
final SolrDocumentList list = get("*:*", 0, 1);
return list.getNumFound();
} catch (final Exception e) {
} catch (final Throwable e) {
Log.logException(e);
return 0;
}
@ -203,7 +208,7 @@ public class SolrSingleConnector implements SolrConnector {
try {
this.server.deleteByQuery("*:*");
this.server.commit();
} catch (final SolrServerException e) {
} catch (final Throwable e) {
throw new IOException(e);
}
}
@ -211,7 +216,7 @@ public class SolrSingleConnector implements SolrConnector {
public void delete(final String id) throws IOException {
try {
this.server.deleteById(id);
} catch (final SolrServerException e) {
} catch (final Throwable e) {
throw new IOException(e);
}
}
@ -219,11 +224,21 @@ public class SolrSingleConnector implements SolrConnector {
public void delete(final List<String> ids) throws IOException {
try {
this.server.deleteById(ids);
} catch (final SolrServerException e) {
} catch (final Throwable e) {
throw new IOException(e);
}
}
public boolean exists(final String id) throws IOException {
try {
final SolrDocumentList list = get("id:" + id, 0, 1);
return list.getNumFound() > 0;
} catch (final Throwable e) {
Log.logException(e);
return false;
}
}
public void add(final File file, final String solrId) throws IOException {
final ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update/extract");
up.addFile(file);
@ -234,7 +249,7 @@ public class SolrSingleConnector implements SolrConnector {
try {
this.server.request(up);
this.server.commit();
} catch (final SolrServerException e) {
} catch (final Throwable e) {
throw new IOException(e);
}
}
@ -243,7 +258,7 @@ public class SolrSingleConnector implements SolrConnector {
add(this.scheme.yacy2solr(id, header, doc));
}
protected void add(final SolrInputDocument solrdoc) throws IOException, SolrException {
public void add(final SolrInputDocument solrdoc) throws IOException, SolrException {
int thisrrc = this.transmissionRoundRobinCounter;
int nextrrc = thisrrc++;
if (nextrrc >= transmissionQueueCount) nextrrc = 0;
@ -269,7 +284,7 @@ public class SolrSingleConnector implements SolrConnector {
req.add( docs );
UpdateResponse rsp = req.process( server );
*/
} catch (final SolrServerException e) {
} catch (final Throwable e) {
throw new IOException(e);
}
}
@ -335,7 +350,7 @@ public class SolrSingleConnector implements SolrConnector {
result.put(element)
}
*/
} catch (final SolrServerException e) {
} catch (final Throwable e) {
throw new IOException(e);
}

@ -32,7 +32,6 @@ import java.util.Iterator;
import net.yacy.cora.ranking.AbstractOrder;
import net.yacy.cora.ranking.Order;
import net.yacy.kelondro.index.HandleSet;
import net.yacy.kelondro.index.RowSpaceExceededException;
public final class NaturalOrder extends AbstractOrder<byte[]> implements ByteOrder, Comparator<byte[]>, Cloneable {
@ -43,7 +42,7 @@ public final class NaturalOrder extends AbstractOrder<byte[]> implements ByteOrd
this.zero = null;
}
public HandleSet getHandleSet(final int keylength, final int space) throws RowSpaceExceededException {
public HandleSet getHandleSet(final int keylength, final int space) {
return new HandleSet(keylength, this, space);
}

@ -850,6 +850,7 @@ public class Table implements Index, Iterable<Row.Entry> {
}
public int size() {
if (this.index == null) return 0;
return this.index.size();
}

@ -7,7 +7,7 @@
// $LastChangedBy$
//
// LICENSE
//
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
@ -26,7 +26,9 @@ package net.yacy.peers.dht;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;
import net.yacy.cora.document.ASCII;
@ -46,9 +48,6 @@ import net.yacy.peers.Seed;
import net.yacy.peers.SeedDB;
import net.yacy.search.index.Segment;
import java.util.List;
import java.util.SortedMap;
public class Transmission {
// The number of RWIs we can be sure a remote peer will accept
@ -60,13 +59,13 @@ public class Transmission {
protected SeedDB seeds;
protected boolean gzipBody4Transfer;
protected int timeout4Transfer;
public Transmission(
Log log,
Segment segment,
SeedDB seeds,
boolean gzipBody4Transfer,
int timeout4Transfer) {
final Log log,
final Segment segment,
final SeedDB seeds,
final boolean gzipBody4Transfer,
final int timeout4Transfer) {
this.log = log;
this.segment = segment;
this.seeds = seeds;
@ -74,7 +73,7 @@ public class Transmission {
this.timeout4Transfer = timeout4Transfer;
}
public Chunk newChunk(byte[] primaryTarget, final List<Seed> targets) {
public Chunk newChunk(final byte[] primaryTarget, final List<Seed> targets) {
return new Chunk(primaryTarget, targets);
}
@ -95,15 +94,15 @@ public class Transmission {
private final HandleSet badReferences;
private final List<Seed> targets;
private int hit, miss;
/**
* generate a new dispatcher target. such a target is defined with a primary target and
* generate a new dispatcher target. such a target is defined with a primary target and
* a set of target peers that shall receive the entries of the containers
* the payloadrow defines the structure of container entries
* @param primaryTarget
* @param targets
*/
public Chunk(byte[] primaryTarget, final List<Seed> targets) {
public Chunk(final byte[] primaryTarget, final List<Seed> targets) {
super();
this.primaryTarget = primaryTarget;
this.containers = new ReferenceContainerCache<WordReference>(Segment.wordReferenceFactory, Segment.wordOrder, Word.commonHashLength);
@ -113,7 +112,7 @@ public class Transmission {
this.hit = 0;
this.miss = 0;
}
/*
* return a new container with at most max elements and put the rest back to the index
* as this chunk might be transferred back to myself a random selection needs to be taken
@ -122,12 +121,12 @@ public class Transmission {
* @throws RowSpaceExceededException
* @return
*/
private ReferenceContainer<WordReference> trimContainer(ReferenceContainer<WordReference> container, final int max) throws RowSpaceExceededException {
private ReferenceContainer<WordReference> trimContainer(final ReferenceContainer<WordReference> container, final int max) throws RowSpaceExceededException {
final ReferenceContainer<WordReference> c = new ReferenceContainer<WordReference>(Segment.wordReferenceFactory, container.getTermHash(), max);
final int part = container.size() / max + 1;
final Random r = new Random();
WordReference w;
List<byte[]> selected = new ArrayList<byte[]>();
final List<byte[]> selected = new ArrayList<byte[]>();
final Iterator<WordReference> i = container.entries();
while ((i.hasNext()) && (c.size() < max)) {
w = i.next();
@ -140,8 +139,8 @@ public class Transmission {
for (final byte[] b : selected) container.removeReference(b);
// put container back
try {
segment.termIndex().add(container);
} catch (Exception e) {
Transmission.this.segment.termIndex().add(container);
} catch (final Exception e) {
Log.logException(e);
}
return c;
@ -151,45 +150,45 @@ public class Transmission {
* add a container to the Entry cache.
* all entries in the container are checked and only such are stored which have a reference entry
* @param container
* @throws RowSpaceExceededException
* @throws RowSpaceExceededException
*/
public void add(ReferenceContainer<WordReference> container) throws RowSpaceExceededException {
public void add(final ReferenceContainer<WordReference> container) throws RowSpaceExceededException {
int remaining = maxRWIsCount;
for (ReferenceContainer ic : this) remaining -= ic.size();
for (final ReferenceContainer<WordReference> ic : this) remaining -= ic.size();
if (remaining <= 0) {
// No space left in this chunk
try {
segment.termIndex().add(container);
} catch (Exception e) {
Transmission.this.segment.termIndex().add(container);
} catch (final Exception e) {
Log.logException(e);
}
return;
}
final ReferenceContainer<WordReference> c = (remaining >= container.size()) ? container : trimContainer(container, remaining);
// iterate through the entries in the container and check if the reference is in the repository
Iterator<WordReference> i = c.entries();
List<byte[]> notFoundx = new ArrayList<byte[]>();
final Iterator<WordReference> i = c.entries();
final List<byte[]> notFoundx = new ArrayList<byte[]>();
while (i.hasNext()) {
WordReference e = i.next();
if (references.containsKey(e.urlhash())) continue;
if (badReferences.has(e.urlhash())) {
final WordReference e = i.next();
if (this.references.containsKey(e.urlhash())) continue;
if (this.badReferences.has(e.urlhash())) {
notFoundx.add(e.urlhash());
continue;
}
URIMetadataRow r = segment.urlMetadata().load(e.urlhash());
final URIMetadataRow r = Transmission.this.segment.urlMetadata().load(e.urlhash());
if (r == null) {
notFoundx.add(e.urlhash());
badReferences.put(e.urlhash());
this.badReferences.put(e.urlhash());
} else {
references.put(e.urlhash(), r);
this.references.put(e.urlhash(), r);
}
}
// now delete all references that were not found
for (final byte[] b : notFoundx) c.removeReference(b);
// finally add the remaining container to the cache
containers.add(c);
this.containers.add(c);
}
/**
* get all containers from the entry. This method may be used to flush remaining entries
* if they had been finished transmission without success (not enough peers arrived)
@ -197,15 +196,15 @@ public class Transmission {
public Iterator<ReferenceContainer<WordReference>> iterator() {
return this.containers.iterator();
}
public int containersSize() {
return this.containers.size();
}
public byte[] primaryTarget() {
return this.primaryTarget;
}
/**
* return the number of successful transmissions
* @return
@ -213,7 +212,7 @@ public class Transmission {
public int hit() {
return this.hit;
}
/**
* return the number of unsuccessful transmissions
* @return
@ -221,7 +220,7 @@ public class Transmission {
public int miss() {
return this.miss;
}
/**
* return the number of targets that are left in the target cache
* if this is empty, there may be no more use of this object and it should be flushed
@ -231,51 +230,51 @@ public class Transmission {
public int targets() {
return this.targets.size();
}
public boolean transmit() {
if (this.targets.isEmpty()) return false;
Seed target = this.targets.remove(0);
final Seed target = this.targets.remove(0);
// transferring selected words to remote peer
if (target == seeds.mySeed() || target.hash.equals(seeds.mySeed().hash)) {
if (target == Transmission.this.seeds.mySeed() || target.hash.equals(Transmission.this.seeds.mySeed().hash)) {
// target is my own peer. This is easy. Just restore the indexContainer
restore();
this.hit++;
log.logInfo("Transfer of chunk to myself-target");
Transmission.this.log.logInfo("Transfer of chunk to myself-target");
return true;
}
log.logInfo("starting new index transmission request to " + ASCII.String(this.primaryTarget));
long start = System.currentTimeMillis();
final String error = Protocol.transferIndex(target, this.containers, this.references, gzipBody4Transfer, timeout4Transfer);
Transmission.this.log.logInfo("starting new index transmission request to " + ASCII.String(this.primaryTarget));
final long start = System.currentTimeMillis();
final String error = Protocol.transferIndex(target, this.containers, this.references, Transmission.this.gzipBody4Transfer, Transmission.this.timeout4Transfer);
if (error == null) {
// words successfully transfered
long transferTime = System.currentTimeMillis() - start;
Iterator<ReferenceContainer<WordReference>> i = this.containers.iterator();
ReferenceContainer<WordReference> firstContainer = (i == null) ? null : i.next();
log.logInfo("Index transfer of " + this.containers.size() +
" words [" + ((firstContainer == null) ? null : ASCII.String(firstContainer.getTermHash())) + " .. " + ASCII.String(this.primaryTarget) + "]" +
final long transferTime = System.currentTimeMillis() - start;
final Iterator<ReferenceContainer<WordReference>> i = this.containers.iterator();
final ReferenceContainer<WordReference> firstContainer = (i == null) ? null : i.next();
Transmission.this.log.logInfo("Index transfer of " + this.containers.size() +
" words [" + ((firstContainer == null) ? null : ASCII.String(firstContainer.getTermHash())) + " .. " + ASCII.String(this.primaryTarget) + "]" +
" and " + this.references.size() + " URLs" +
" to peer " + target.getName() + ":" + target.hash +
" in " + (transferTime / 1000) +
" seconds successful (" + (1000 * this.containers.size() / (transferTime + 1)) +
" to peer " + target.getName() + ":" + target.hash +
" in " + (transferTime / 1000) +
" seconds successful (" + (1000 * this.containers.size() / (transferTime + 1)) +
" words/s)");
seeds.mySeed().incSI(this.containers.size());
seeds.mySeed().incSU(this.references.size());
Transmission.this.seeds.mySeed().incSI(this.containers.size());
Transmission.this.seeds.mySeed().incSU(this.references.size());
// if the peer has set a pause time and we are in flush mode (index transfer)
// then we pause for a while now
log.logInfo("Transfer finished of chunk to target " + target.hash + "/" + target.getName());
Transmission.this.log.logInfo("Transfer finished of chunk to target " + target.hash + "/" + target.getName());
this.hit++;
return true;
}
this.miss++;
// write information that peer does not receive index transmissions
log.logInfo("Transfer failed of chunk to target " + target.hash + "/" + target.getName() + ": " + error);
Transmission.this.log.logInfo("Transfer failed of chunk to target " + target.hash + "/" + target.getName() + ": " + error);
// get possibly newer target Info
Seed newTarget = seeds.get(target.hash);
final Seed newTarget = Transmission.this.seeds.get(target.hash);
if (newTarget != null) {
String oldAddress = target.getPublicAddress();
final String oldAddress = target.getPublicAddress();
if ((oldAddress != null) && (oldAddress.equals(newTarget.getPublicAddress()))) {
newTarget.setFlagAcceptRemoteIndex(false);
seeds.update(newTarget.hash, newTarget);
Transmission.this.seeds.update(newTarget.hash, newTarget);
} else {
// we tried an old Address. Don't change anything
}
@ -284,25 +283,25 @@ public class Transmission {
}
return false;
}
public boolean isFinished() {
//System.out.println("canFinish: hit = " + this.hit + ", redundancy = " + seeds.redundancy() + ", targets.size() = " + targets.size());
return this.hit >= seeds.redundancy();
return this.hit >= Transmission.this.seeds.redundancy();
}
public boolean canFinish() {
//System.out.println("canFinish: hit = " + this.hit + ", redundancy = " + seeds.redundancy() + ", targets.size() = " + targets.size());
return this.targets.size() >= seeds.redundancy() - this.hit;
return this.targets.size() >= Transmission.this.seeds.redundancy() - this.hit;
}
public void restore() {
for (ReferenceContainer<WordReference> ic : this) try {
segment.termIndex().add(ic);
} catch (Exception e) {
for (final ReferenceContainer<WordReference> ic : this) try {
Transmission.this.segment.termIndex().add(ic);
} catch (final Exception e) {
Log.logException(e);
}
}
}
}

@ -225,7 +225,7 @@ public final class Switchboard extends serverSwitch {
public RobotsTxt robots;
public Map<String, Object[]> outgoingCookies, incomingCookies;
public volatile long proxyLastAccess, localSearchLastAccess, remoteSearchLastAccess;
public Network yc;
public Network yc;
public ResourceObserver observer;
public UserDB userDB;
public BookmarksDB bookmarksDB;
@ -599,7 +599,7 @@ public final class Switchboard extends serverSwitch {
final String solrurls = getConfig("federated.service.solr.indexing.url", "http://127.0.0.1:8983/solr");
final boolean usesolr = getConfigBool("federated.service.solr.indexing.enabled", false) & solrurls.length() > 0;
try {
this.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectSolr((usesolr) ? new SolrChardingConnector(solrurls, workingScheme, SolrChardingSelection.Method.MODULO_HOST_MD5) : null);
this.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectSolr((usesolr) ? new SolrChardingConnector(solrurls, workingScheme, SolrChardingSelection.Method.MODULO_HOST_MD5, 10000) : null);
} catch (final IOException e) {
Log.logException(e);
this.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectSolr(null);

@ -45,6 +45,7 @@ import net.yacy.cora.protocol.http.HTTPClient;
import net.yacy.cora.ranking.ConcurrentScoreMap;
import net.yacy.cora.ranking.ScoreMap;
import net.yacy.cora.ranking.WeakPriorityBlockingQueue;
import net.yacy.cora.services.federated.solr.SolrConnector;
import net.yacy.document.parser.html.CharacterCoding;
import net.yacy.kelondro.data.meta.DigestURI;
import net.yacy.kelondro.data.meta.URIMetadataRow;
@ -69,6 +70,7 @@ public final class MetadataRepository implements Iterable<byte[]> {
private final File location;
private final String tablename;
private ArrayList<HostStat> statsDump;
private SolrConnector solr;
public MetadataRepository(
final File path,
@ -82,6 +84,15 @@ public final class MetadataRepository implements Iterable<byte[]> {
this.urlIndexFile = backupIndex; //new Cache(backupIndex, 20000000, 20000000);
this.exportthread = null; // will have a export thread assigned if exporter is running
this.statsDump = null;
this.solr = null;
}
public void connectSolr(final SolrConnector solr) {
this.solr = solr;
}
public SolrConnector getSolr() {
return this.solr;
}
public void clearCache() {
@ -110,6 +121,7 @@ public final class MetadataRepository implements Iterable<byte[]> {
this.urlIndexFile.close();
this.urlIndexFile = null;
}
if (this.solr != null) this.solr.close();
}
public int writeCacheSize() {
@ -191,6 +203,12 @@ public final class MetadataRepository implements Iterable<byte[]> {
}
public boolean exists(final byte[] urlHash) {
try {
if (this.solr != null && this.solr.exists(ASCII.String(urlHash))) {
return true;
}
} catch (final Throwable e) {
}
if (this.urlIndexFile == null) return false; // case may happen during shutdown
return this.urlIndexFile.has(urlHash);
}

@ -85,7 +85,6 @@ public class Segment {
protected final IndexCell<WordReference> termIndex;
//private final IndexCell<NavigationReference> authorNavIndex;
protected final MetadataRepository urlMetadata;
private SolrConnector solr;
private final File segmentPath;
public Segment(
@ -103,7 +102,6 @@ public class Segment {
this.log = log;
this.segmentPath = segmentPath;
this.solr = null;
this.termIndex = new IndexCell<WordReference>(
segmentPath,
@ -133,11 +131,11 @@ public class Segment {
}
public void connectSolr(final SolrConnector solr) {
this.solr = solr;
this.urlMetadata.connectSolr(solr);
}
public SolrConnector getSolr() {
return this.solr;
return this.urlMetadata.getSolr();
}
public static void migrateTextIndex(final File oldSegmentPath, final File newSegmentPath) {
@ -173,8 +171,8 @@ public class Segment {
public IndexCell<WordReference> termIndex() {
return this.termIndex;
}
public boolean exists(byte[] urlhash) {
public boolean exists(final byte[] urlhash) {
return this.urlMetadata.exists(urlhash);
}
@ -272,7 +270,6 @@ public class Segment {
public void close() {
this.termIndex.close();
this.urlMetadata.close();
if (this.solr != null) this.solr.close();
}
public URIMetadataRow storeDocument(

Loading…
Cancel
Save