@ -21,11 +21,10 @@
package net.yacy.cora.federate.solr.connector ;
import java.io.IOException ;
import java.util.ArrayList ;
import java.util.Collection ;
import java.util.Iterator ;
import java.util.LinkedHashMap ;
import java.util.Map ;
import java.util.concurrent.ArrayBlockingQueue ;
import java.util.concurrent.BlockingQueue ;
import net.yacy.cora.sorting.ReversibleScoreMap ;
@ -54,75 +53,40 @@ import org.apache.solr.common.params.ModifiableSolrParams;
* /
public class ConcurrentUpdateSolrConnector implements SolrConnector {
SolrConnector connector ;
private final static Object POISON_PROCESS = new Object ( ) ;
private final static long AUTOCOMMIT = 3000 ; // milliseconds
private class Process Handler extends Thread {
private class CommitHandler extends Thread {
@Override
public void run ( ) {
try {
Object process ;
Collection < SolrInputDocument > docs = new ArrayList < SolrInputDocument > ( ) ;
while ( ( process = ConcurrentUpdateSolrConnector . this . processQueue . take ( ) ) ! = POISON_PROCESS ) {
if ( process instanceof String ) {
// delete document
if ( docs . size ( ) > 0 ) addSynchronized ( docs ) ;
String id = ( String ) process ;
try {
ConcurrentUpdateSolrConnector . this . connector . deleteById ( id ) ;
} catch ( final IOException e ) {
ConcurrentLog . logException ( e ) ;
}
}
if ( process instanceof SolrInputDocument ) {
SolrInputDocument doc = ( SolrInputDocument ) process ;
docs . add ( doc ) ;
}
if ( docs . size ( ) > 0 & &
( ConcurrentUpdateSolrConnector . this . processQueue . size ( ) = = 0 | |
docs . size ( ) > = ConcurrentUpdateSolrConnector . this . processQueue . size ( ) + ConcurrentUpdateSolrConnector . this . processQueue . remainingCapacity ( ) ) ) {
addSynchronized ( docs ) ;
while ( ConcurrentUpdateSolrConnector . this . commitProcessRunning ) {
commitDocBuffer ( ) ;
try { Thread . sleep ( AUTOCOMMIT ) ; } catch ( final InterruptedException e ) {
ConcurrentLog . logException ( e ) ;
}
}
} catch ( final InterruptedException e ) {
ConcurrentLog. logException ( e ) ;
} finally {
commitDocBuffer ( ) ;
}
}
private void addSynchronized ( final Collection < SolrInputDocument > docs ) {
assert docs . size ( ) > 0 ;
try {
ConcurrentUpdateSolrConnector . this . connector . add ( docs ) ;
} catch ( final OutOfMemoryError e ) {
// clear and try again...
clearCaches ( ) ;
try {
ConcurrentUpdateSolrConnector . this . connector . add ( docs ) ;
} catch ( final IOException ee ) {
ConcurrentLog . logException ( e ) ;
}
} catch ( final IOException e ) {
ConcurrentLog . logException ( e ) ;
}
docs . clear ( ) ;
}
}
private SolrConnector connector ;
private ARC < String , Metadata > metadataCache ;
private ARH < String > missCache ;
private BlockingQueue < Object > processQueue ;
private ProcessHandler processHandler ;
private final ARH < String > missCache ;
private final LinkedHashMap < String , SolrInputDocument > docBuffer ;
private CommitHandler processHandler ;
private final int updateCapacity ;
private boolean commitProcessRunning ;
public ConcurrentUpdateSolrConnector ( final SolrConnector connector , final int updateCapacity , final int idCacheCapacity , final int concurrency ) {
this . connector = connector ;
this . metadataCache = new ConcurrentARC < String , Metadata > ( idCacheCapacity , concurrency ) ;
this . missCache = new ConcurrentARH < String > ( idCacheCapacity , concurrency ) ;
this . processQueue = new ArrayBlockingQueue < Object > ( updateCapacity ) ;
this . updateCapacity = updateCapacity ;
this . metadataCache = new ConcurrentARC < > ( idCacheCapacity , concurrency ) ;
this . missCache = new ConcurrentARH < > ( idCacheCapacity , concurrency ) ;
this . docBuffer = new LinkedHashMap < > ( ) ;
this . processHandler = null ;
this . commitProcessRunning = true ;
ensureAliveProcessHandler ( ) ;
}
@ -136,9 +100,34 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
return o instanceof ConcurrentUpdateSolrConnector & & this . connector . equals ( ( ( ConcurrentUpdateSolrConnector ) o ) . connector ) ;
}
private void commitDocBuffer ( ) {
synchronized ( this . docBuffer ) {
//System.out.println("*** commit of " + this.docBuffer.size() + " documents");
//Thread.dumpStack();
if ( this . docBuffer . size ( ) > 0 ) try {
this . connector . add ( this . docBuffer . values ( ) ) ;
} catch ( final OutOfMemoryError e ) {
// clear and try again...
clearCaches ( ) ;
try {
this . connector . add ( this . docBuffer . values ( ) ) ;
} catch ( final IOException ee ) {
ConcurrentLog . logException ( e ) ;
}
} catch ( final IOException e ) {
ConcurrentLog . logException ( e ) ;
}
// move documents to metadata cache
for ( Map . Entry < String , SolrInputDocument > entry : this . docBuffer . entrySet ( ) ) {
updateCache ( entry . getKey ( ) , AbstractSolrConnector . getMetadata ( entry . getValue ( ) ) ) ;
}
this . docBuffer . clear ( ) ;
}
}
@Override
public int bufferSize ( ) {
return this . processQueue . size ( ) ;
return this . updateCapacity ;
}
@Override
@ -148,57 +137,6 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
this . missCache . clear ( ) ;
}
/ * *
* used for debugging
* /
private static void cacheSuccessSign ( ) {
//ConcurrentLog.info("ConcurrentUpdate", "**** cache hit");
}
private boolean containsDeleteInProcessQueue ( final String id ) {
boolean delete = false ;
boolean ctch = false ;
for ( Object o : this . processQueue ) {
if ( o = = null ) break ;
if ( checkDelete ( o , id ) ) delete = true ; // do not add a break here!
if ( checkAdd ( o , id ) ) { delete = false ; ctch = true ; } // do not add a break here!
}
if ( ctch & & delete ) removeFromProcessQueue ( id ) ; // clean up put+remove
return delete ;
}
private SolrInputDocument getFromProcessQueue ( final String id ) {
SolrInputDocument d = null ;
boolean ctch = false ;
for ( Object o : this . processQueue ) {
if ( o = = null ) break ;
if ( checkDelete ( o , id ) ) d = null ; // do not add a break here!
if ( checkAdd ( o , id ) ) { d = ( SolrInputDocument ) o ; ctch = true ; } // do not add a break here!
}
if ( ctch & & d = = null ) removeFromProcessQueue ( id ) ; // clean up put+remove
return d ;
}
private void removeFromProcessQueue ( final String id ) {
Iterator < Object > i = this . processQueue . iterator ( ) ;
while ( i . hasNext ( ) ) {
if ( checkAdd ( i . next ( ) , id ) ) { i . remove ( ) ; break ; }
}
}
private boolean checkDelete ( final Object o , final String id ) {
if ( ! ( o instanceof String ) ) return false ;
String docID = ( String ) o ;
return ( docID ! = null & & docID . equals ( id ) ) ;
}
private boolean checkAdd ( final Object o , final String id ) {
if ( ! ( o instanceof SolrInputDocument ) ) return false ;
SolrInputDocument doc = ( SolrInputDocument ) o ;
String docID = ( String ) doc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
return ( docID ! = null & & docID . equals ( id ) ) ;
}
private void updateCache ( final String id , final Metadata md ) {
if ( id = = null ) return ;
if ( MemoryControl . shortStatus ( ) ) {
@ -211,7 +149,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
public void ensureAliveProcessHandler ( ) {
if ( this . processHandler = = null | | ! this . processHandler . isAlive ( ) ) {
this . processHandler = new Process Handler( ) ;
this . processHandler = new Commit Handler( ) ;
this . processHandler . setName ( this . getClass ( ) . getName ( ) + "_ProcessHandler" ) ;
this . processHandler . start ( ) ;
}
@ -224,22 +162,19 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public long getSize ( ) {
return this . connector . getSize ( ) + this . processQueue . size ( ) ;
return this . connector . getSize ( ) + this . docBuffer . size ( ) ;
}
@Override
public void commit ( boolean softCommit ) {
long timeout = System . currentTimeMillis ( ) + 1000 ;
ensureAliveProcessHandler ( ) ;
while ( this . processQueue . size ( ) > 0 ) {
try { Thread . sleep ( 10 ) ; } catch ( final InterruptedException e ) { }
if ( System . currentTimeMillis ( ) > timeout ) break ;
}
commitDocBuffer ( ) ;
this . connector . commit ( softCommit ) ;
}
@Override
public void optimize ( int maxSegments ) {
commitDocBuffer ( ) ;
this . connector . optimize ( maxSegments ) ;
}
@ -256,7 +191,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public void close ( ) {
ensureAliveProcessHandler ( ) ;
t ry { this . processQueue . put ( POISON_PROCESS ) ; } catch ( final InterruptedException e ) { }
t his. commitProcessRunning = false ;
try { this . processHandler . join ( ) ; } catch ( final InterruptedException e ) { }
this . connector . close ( ) ;
this . metadataCache . clear ( ) ;
@ -266,21 +201,20 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public void clear ( ) throws IOException {
this . processQueue . clear ( ) ;
this . docBuffer . clear ( ) ;
this . connector . clear ( ) ;
this . metadataCache . clear ( ) ;
this . missCache . clear ( ) ;
}
@Override
public synchronized void deleteById ( String id ) throws IOException {
this . metadataCache . remove ( id ) ;
this . missCache . add ( id ) ;
ensureAliveProcessHandler ( ) ;
if ( this . processHandler . isAlive ( ) ) {
try { this . processQueue . put ( id ) ; } catch ( final InterruptedException e ) { }
} else {
this . connector . deleteById ( id ) ;
synchronized ( this . docBuffer ) {
this . docBuffer . remove ( id ) ;
}
this . connector . deleteById ( id ) ;
}
@Override
@ -289,33 +223,40 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
this . metadataCache . remove ( id ) ;
this . missCache . add ( id ) ;
}
ensureAliveProcessHandler ( ) ;
if ( this . processHandler . isAlive ( ) ) {
for ( String id : ids ) try { this . processQueue . put ( id ) ; } catch ( final InterruptedException e ) { }
} else {
this . connector . deleteByIds ( ids ) ;
synchronized ( this . docBuffer ) {
for ( String id : ids ) {
this . docBuffer . remove ( id ) ;
}
}
this . connector . deleteByIds ( ids ) ;
}
@Override
public void deleteByQuery ( final String querystring ) throws IOException {
commitDocBuffer ( ) ;
try {
ConcurrentUpdateSolrConnector . this. connector . deleteByQuery ( querystring ) ;
ConcurrentUpdateSolrConnector . this. metadataCache . clear ( ) ;
this. connector . deleteByQuery ( querystring ) ;
this. metadataCache . clear ( ) ;
} catch ( final IOException e ) {
ConcurrentLog . severe ( "ConcurrentUpdateSolrConnector" , e . getMessage ( ) , e ) ;
}
ConcurrentUpdateSolrConnector . this . connector . commit ( true ) ;
}
@Override
public Metadata getMetadata ( String id ) throws IOException {
if ( this . missCache . contains ( id ) ) { cacheSuccessSign ( ) ; return null ; }
if ( this . missCache . contains ( id ) ) return null ;
Metadata md = this . metadataCache . get ( id ) ;
if ( md ! = null ) { cacheSuccessSign ( ) ; return md ; }
if ( containsDeleteInProcessQueue ( id ) ) { cacheSuccessSign ( ) ; return null ; }
SolrInputDocument doc = getFromProcessQueue ( id ) ;
if ( doc ! = null ) { cacheSuccessSign ( ) ; return AbstractSolrConnector . getMetadata ( doc ) ; }
if ( md ! = null ) {
//System.out.println("*** metadata cache hit; metadataCache.size() = " + metadataCache.size());
//Thread.dumpStack();
return md ;
}
SolrInputDocument doc = this . docBuffer . get ( id ) ;
if ( doc ! = null ) {
//System.out.println("*** docBuffer cache hit; docBuffer.size() = " + docBuffer.size());
//Thread.dumpStack();
return AbstractSolrConnector . getMetadata ( doc ) ;
}
md = this . connector . getMetadata ( id ) ;
if ( md = = null ) { this . missCache . add ( id ) ; return null ; }
updateCache ( id , md ) ;
@ -325,26 +266,34 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public void add ( SolrInputDocument solrdoc ) throws IOException , SolrException {
String id = ( String ) solrdoc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
updateCache ( id , AbstractSolrConnector . getMetadata ( solrdoc ) ) ;
this . metadataCache . remove ( id ) ; // remove the id from the metadata cache because it will be overwritten by the update process anyway
ensureAliveProcessHandler ( ) ;
if ( this . processHandler . isAlive ( ) ) {
try { this . processQueue . put ( solrdoc ) ; } catch ( final InterruptedException e ) { }
synchronized ( this . docBuffer ) { this . docBuffer . put ( id , solrdoc ) ; }
} else {
this . connector . add ( solrdoc ) ;
updateCache ( id , AbstractSolrConnector . getMetadata ( solrdoc ) ) ;
}
if ( MemoryControl . shortStatus ( ) | | this . docBuffer . size ( ) > this . updateCapacity ) {
commitDocBuffer ( ) ;
}
}
@Override
public void add ( Collection < SolrInputDocument > solrdocs ) throws IOException , SolrException {
for ( SolrInputDocument doc : solrdocs ) {
String id = ( String ) doc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
updateCache ( id , AbstractSolrConnector . getMetadata ( doc ) ) ;
}
ensureAliveProcessHandler ( ) ;
if ( this . processHandler . isAlive ( ) ) {
for ( SolrInputDocument doc : solrdocs ) try { this . processQueue . put ( doc ) ; } catch ( final InterruptedException e ) { }
} else {
this . connector . add ( solrdocs ) ;
synchronized ( this . docBuffer ) {
for ( SolrInputDocument solrdoc : solrdocs ) {
String id = ( String ) solrdoc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
if ( this . processHandler . isAlive ( ) ) {
this . docBuffer . put ( id , solrdoc ) ;
} else {
this . connector . add ( solrdoc ) ;
}
}
}
if ( MemoryControl . shortStatus ( ) | | this . docBuffer . size ( ) > this . updateCapacity ) {
commitDocBuffer ( ) ;
}
}
@ -352,26 +301,36 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
public SolrDocument getDocumentById ( final String id , String . . . fields ) throws IOException {
assert id . length ( ) = = Word . commonHashLength : "wrong id: " + id ;
if ( this . missCache . contains ( id ) ) return null ;
if ( containsDeleteInProcessQueue ( id ) ) return null ;
SolrInputDocument idoc = getFromProcessQueue ( id ) ;
if ( idoc ! = null ) { cacheSuccessSign ( ) ; return ClientUtils . toSolrDocument ( idoc ) ; }
SolrDocument doc = this . connector . getDocumentById ( id , AbstractSolrConnector . ensureEssentialFieldsIncluded ( fields ) ) ;
if ( doc = = null ) {
SolrInputDocument idoc = this . docBuffer . get ( id ) ;
if ( idoc ! = null ) {
//System.out.println("*** docBuffer cache hit; docBuffer.size() = " + docBuffer.size());
//Thread.dumpStack();
return ClientUtils . toSolrDocument ( idoc ) ;
}
SolrDocument solrdoc = this . connector . getDocumentById ( id , AbstractSolrConnector . ensureEssentialFieldsIncluded ( fields ) ) ;
if ( solrdoc = = null ) {
this . missCache . add ( id ) ;
this . metadataCache . remove ( id ) ;
} else {
updateCache ( id , AbstractSolrConnector . getMetadata ( doc ) ) ;
updateCache ( id , AbstractSolrConnector . getMetadata ( solr doc) ) ;
}
return doc;
return solr doc;
}
@Override
public QueryResponse getResponseByParams ( ModifiableSolrParams query ) throws IOException , SolrException {
commitDocBuffer ( ) ;
return this . connector . getResponseByParams ( query ) ;
}
@Override
public SolrDocumentList getDocumentListByParams ( ModifiableSolrParams params ) throws IOException , SolrException {
commitDocBuffer ( ) ;
SolrDocumentList sdl = this . connector . getDocumentListByParams ( params ) ;
for ( SolrDocument doc : sdl ) {
String id = ( String ) doc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
updateCache ( id , AbstractSolrConnector . getMetadata ( doc ) ) ;
}
return sdl ;
}
@ -383,6 +342,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public SolrDocumentList getDocumentListByQuery ( String querystring , String sort , int offset , int count , String . . . fields ) throws IOException , SolrException {
commitDocBuffer ( ) ;
if ( offset = = 0 & & count = = 1 & & querystring . startsWith ( "id:" ) & &
( ( querystring . length ( ) = = 17 & & querystring . charAt ( 3 ) = = '"' & & querystring . charAt ( 16 ) = = '"' ) | |
querystring . length ( ) = = 15 ) ) {
@ -393,34 +353,30 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
}
SolrDocumentList sdl = this . connector . getDocumentListByQuery ( querystring , sort , offset , count , AbstractSolrConnector . ensureEssentialFieldsIncluded ( fields ) ) ;
/ *
Iterator < SolrDocument > i = sdl . iterator ( ) ;
while ( i . hasNext ( ) ) {
SolrDocument doc = i . next ( ) ;
String id = ( String ) doc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
if ( doc ! = null ) updateIdCache ( id , AbstractSolrConnector . getLoadDate ( doc ) ) ;
}
* /
return sdl ;
}
@Override
public long getCountByQuery ( String querystring ) throws IOException {
commitDocBuffer ( ) ;
return this . connector . getCountByQuery ( querystring ) ;
}
@Override
public Map < String , ReversibleScoreMap < String > > getFacets ( String query , int maxresults , String . . . fields ) throws IOException {
commitDocBuffer ( ) ;
return this . connector . getFacets ( query , maxresults , fields ) ;
}
@Override
public BlockingQueue < SolrDocument > concurrentDocumentsByQuery ( String querystring , String sort , int offset , int maxcount , long maxtime , int buffersize , final int concurrency , String . . . fields ) {
commitDocBuffer ( ) ;
return this . connector . concurrentDocumentsByQuery ( querystring , sort , offset , maxcount , maxtime , buffersize , concurrency , fields ) ;
}
@Override
public BlockingQueue < String > concurrentIDsByQuery ( String querystring , String sort , int offset , int maxcount , long maxtime , int buffersize , final int concurrency ) {
commitDocBuffer ( ) ;
return this . connector . concurrentIDsByQuery ( querystring , sort , offset , maxcount , maxtime , buffersize , concurrency ) ;
}