@ -31,7 +31,9 @@ import java.util.concurrent.LinkedBlockingQueue;
import net.yacy.cora.sorting.ReversibleScoreMap ;
import net.yacy.cora.storage.ARC ;
import net.yacy.cora.storage.ARH ;
import net.yacy.cora.storage.ConcurrentARC ;
import net.yacy.cora.storage.ConcurrentARH ;
import net.yacy.cora.util.ConcurrentLog ;
import net.yacy.kelondro.util.MemoryControl ;
import net.yacy.search.schema.CollectionSchema ;
@ -66,7 +68,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
try {
removeIdFromUpdateQueue ( id ) ;
ConcurrentUpdateSolrConnector . this . connector . deleteById ( id ) ;
ConcurrentUpdateSolrConnector . this . id Cache. remove ( id ) ;
ConcurrentUpdateSolrConnector . this . metadata Cache. remove ( id ) ;
} catch ( final IOException e ) {
ConcurrentLog . logException ( e ) ;
}
@ -90,7 +92,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
docs . add ( doc ) ;
String id = ( String ) doc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
Metadata md = AbstractSolrConnector . getMetadata ( doc ) ;
update Id Cache( id , md ) ;
update Cache( id , md ) ;
for ( int i = 0 ; i < getmore ; i + + ) {
SolrInputDocument d = ConcurrentUpdateSolrConnector . this . updateQueue . take ( ) ;
if ( d = = POISON_DOCUMENT ) {
@ -100,7 +102,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
docs . add ( d ) ;
id = ( String ) d . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
md = AbstractSolrConnector . getMetadata ( d ) ;
update Id Cache( id , md ) ;
update Cache( id , md ) ;
}
//ConcurrentLog.info("ConcurrentUpdateSolrConnector", "sending " + docs.size() + " documents to solr");
try {
@ -113,7 +115,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
//ConcurrentLog.info("ConcurrentUpdateSolrConnector", "sending one document to solr");
String id = ( String ) doc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
Metadata md = AbstractSolrConnector . getMetadata ( doc ) ;
update Id Cache( id , md ) ;
update Cache( id , md ) ;
try {
ConcurrentUpdateSolrConnector . this . connector . add ( doc ) ;
} catch ( final OutOfMemoryError e ) {
@ -135,14 +137,16 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
}
}
private ARC < String , Metadata > idCache ;
private ARC < String , Metadata > metadataCache ;
private ARH < String > missCache ;
private BlockingQueue < SolrInputDocument > updateQueue ;
private BlockingQueue < String > deleteQueue ;
private Thread deletionHandler , updateHandler ;
public ConcurrentUpdateSolrConnector ( SolrConnector connector , int updateCapacity , int idCacheCapacity , int concurrency ) {
this . connector = connector ;
this . idCache = new ConcurrentARC < String , Metadata > ( idCacheCapacity , concurrency ) ; // url hash to load time
this . metadataCache = new ConcurrentARC < String , Metadata > ( idCacheCapacity , concurrency ) ;
this . missCache = new ConcurrentARH < String > ( idCacheCapacity , concurrency ) ;
this . updateQueue = new ArrayBlockingQueue < SolrInputDocument > ( updateCapacity ) ;
this . deleteQueue = new LinkedBlockingQueue < String > ( ) ;
this . deletionHandler = null ;
@ -159,7 +163,8 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public void clearCaches ( ) {
this . connector . clearCaches ( ) ;
this . idCache . clear ( ) ;
this . metadataCache . clear ( ) ;
this . missCache . clear ( ) ;
}
/ * *
@ -233,10 +238,14 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
}
}
private void update Id Cache( final String id , final Metadata md ) {
private void update Cache( final String id , final Metadata md ) {
if ( id = = null ) return ;
if ( MemoryControl . shortStatus ( ) ) this . idCache . clear ( ) ;
this . idCache . put ( id , md ) ;
if ( MemoryControl . shortStatus ( ) ) {
this . metadataCache . clear ( ) ;
this . missCache . clear ( ) ;
}
this . metadataCache . put ( id , md ) ;
this . missCache . delete ( id ) ;
}
public void ensureAliveDeletionHandler ( ) {
@ -307,9 +316,9 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
try { this . deletionHandler . join ( ) ; } catch ( final InterruptedException e ) { }
try { this . updateHandler . join ( ) ; } catch ( final InterruptedException e ) { }
this . connector . close ( ) ;
this . id Cache. clear ( ) ;
this . metadata Cache. clear ( ) ;
this . connector = null ;
this . id Cache = null ;
this . metadata Cache = null ;
}
@Override
@ -319,13 +328,14 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
try { this . updateQueue . put ( POISON_DOCUMENT ) ; } catch ( final InterruptedException e ) { }
try { this . updateHandler . join ( ) ; } catch ( final InterruptedException e ) { }
this . connector . clear ( ) ;
this . id Cache. clear ( ) ;
this . metadata Cache. clear ( ) ;
}
@Override
public void deleteById ( String id ) throws IOException {
public synchronized void deleteById ( String id ) throws IOException {
removeIdFromUpdateQueue ( id ) ;
this . idCache . remove ( id ) ;
this . metadataCache . remove ( id ) ;
this . missCache . add ( id ) ;
if ( this . deletionHandler . isAlive ( ) ) {
try { this . deleteQueue . put ( id ) ; } catch ( final InterruptedException e ) { }
} else {
@ -334,10 +344,11 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
}
@Override
public void deleteByIds ( Collection < String > ids ) throws IOException {
public synchronized void deleteByIds ( Collection < String > ids ) throws IOException {
for ( String id : ids ) {
removeIdFromUpdateQueue ( id ) ;
this . idCache . remove ( id ) ;
this . metadataCache . remove ( id ) ;
this . missCache . add ( id ) ;
}
if ( this . deletionHandler . isAlive ( ) ) {
for ( String id : ids ) try { this . deleteQueue . put ( id ) ; } catch ( final InterruptedException e ) { }
@ -348,30 +359,27 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public void deleteByQuery ( final String querystring ) throws IOException {
//new Thread() {
// public void run() {
ConcurrentUpdateSolrConnector . this . idCache . clear ( ) ;
try {
ConcurrentUpdateSolrConnector . this . connector . deleteByQuery ( querystring ) ;
ConcurrentUpdateSolrConnector . this . idCache . clear ( ) ;
} catch ( final IOException e ) {
ConcurrentLog . severe ( "ConcurrentUpdateSolrConnector" , e . getMessage ( ) , e ) ;
}
ConcurrentUpdateSolrConnector . this . connector . commit ( true ) ;
// }
//}.start();
try {
ConcurrentUpdateSolrConnector . this . connector . deleteByQuery ( querystring ) ;
ConcurrentUpdateSolrConnector . this . metadataCache . clear ( ) ;
ConcurrentUpdateSolrConnector . this . missCache . clear ( ) ;
} catch ( final IOException e ) {
ConcurrentLog . severe ( "ConcurrentUpdateSolrConnector" , e . getMessage ( ) , e ) ;
}
ConcurrentUpdateSolrConnector . this . connector . commit ( true ) ;
}
@Override
public Metadata getMetadata ( String id ) throws IOException {
Metadata md = this . idCache . get ( id ) ;
if ( this . missCache . contains ( id ) ) { cacheSuccessSign ( ) ; return null ; }
Metadata md = this . metadataCache . get ( id ) ;
if ( md ! = null ) { cacheSuccessSign ( ) ; return md ; }
if ( existIdFromDeleteQueue ( id ) ) { cacheSuccessSign ( ) ; return null ; }
md = existIdFromUpdateQueue ( id ) ;
if ( md ! = null ) { cacheSuccessSign ( ) ; return md ; }
md = this . connector . getMetadata ( id ) ;
if ( md = = null ) return null ;
update Id Cache( id , md ) ;
if ( md = = null ) { this . missCache . add ( id ) ; return null ; }
update Cache( id , md ) ;
return md ;
}
@ -379,7 +387,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
public void add ( SolrInputDocument solrdoc ) throws IOException , SolrException {
String id = ( String ) solrdoc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
removeIdFromDeleteQueue ( id ) ;
update Id Cache( id , AbstractSolrConnector . getMetadata ( solrdoc ) ) ;
update Cache( id , AbstractSolrConnector . getMetadata ( solrdoc ) ) ;
if ( this . updateHandler . isAlive ( ) ) {
try { this . updateQueue . put ( solrdoc ) ; } catch ( final InterruptedException e ) { }
} else {
@ -392,7 +400,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
for ( SolrInputDocument doc : solrdocs ) {
String id = ( String ) doc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
removeIdFromDeleteQueue ( id ) ;
update Id Cache( id , AbstractSolrConnector . getMetadata ( doc ) ) ;
update Cache( id , AbstractSolrConnector . getMetadata ( doc ) ) ;
}
if ( this . updateHandler . isAlive ( ) ) {
for ( SolrInputDocument doc : solrdocs ) try { this . updateQueue . put ( doc ) ; } catch ( final InterruptedException e ) { }
@ -403,11 +411,16 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public SolrDocument getDocumentById ( final String id , String . . . fields ) throws IOException {
if ( this . missCache . contains ( id ) ) return null ;
if ( existIdFromDeleteQueue ( id ) ) return null ;
SolrInputDocument idoc = getFromUpdateQueue ( id ) ;
if ( idoc ! = null ) { cacheSuccessSign ( ) ; return ClientUtils . toSolrDocument ( idoc ) ; }
SolrDocument doc = this . connector . getDocumentById ( id , AbstractSolrConnector . ensureEssentialFieldsIncluded ( fields ) ) ;
if ( doc ! = null ) updateIdCache ( id , AbstractSolrConnector . getMetadata ( doc ) ) ;
if ( doc = = null ) {
this . missCache . add ( id ) ;
} else {
updateCache ( id , AbstractSolrConnector . getMetadata ( doc ) ) ;
}
return doc ;
}