@ -89,8 +89,8 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
Collection < SolrInputDocument > docs = new ArrayList < SolrInputDocument > ( getmore + 1 ) ;
docs . add ( doc ) ;
String id = ( String ) doc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
long date = AbstractSolrConnector . getLoadDate ( doc ) ;
updateIdCache ( id , date ) ;
Metadata md = AbstractSolrConnector . getMetadata ( doc ) ;
updateIdCache ( id , m d) ;
for ( int i = 0 ; i < getmore ; i + + ) {
SolrInputDocument d = ConcurrentUpdateSolrConnector . this . updateQueue . take ( ) ;
if ( d = = POISON_DOCUMENT ) {
@ -99,8 +99,8 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
}
docs . add ( d ) ;
id = ( String ) d . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
date = AbstractSolrConnector . get LoadDate ( d ) ;
updateIdCache ( id , date ) ;
m d = AbstractSolrConnector . get Metadata ( d ) ;
updateIdCache ( id , m d) ;
}
//ConcurrentLog.info("ConcurrentUpdateSolrConnector", "sending " + docs.size() + " documents to solr");
try {
@ -112,8 +112,8 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
// if there is only a single document, send this directly to solr
//ConcurrentLog.info("ConcurrentUpdateSolrConnector", "sending one document to solr");
String id = ( String ) doc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
long date = AbstractSolrConnector . getLoadDate ( doc ) ;
updateIdCache ( id , date ) ;
Metadata md = AbstractSolrConnector . getMetadata ( doc ) ;
updateIdCache ( id , m d) ;
try {
ConcurrentUpdateSolrConnector . this . connector . add ( doc ) ;
} catch ( final OutOfMemoryError e ) {
@ -134,15 +134,15 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
}
}
}
private ARC < String , Long > idCache ;
private ARC < String , Metadata > idCache ;
private BlockingQueue < SolrInputDocument > updateQueue ;
private BlockingQueue < String > deleteQueue ;
private Thread deletionHandler , updateHandler ;
public ConcurrentUpdateSolrConnector ( SolrConnector connector , int updateCapacity , int idCacheCapacity , int concurrency ) {
this . connector = connector ;
this . idCache = new ConcurrentARC < String , Long > ( idCacheCapacity , concurrency ) ; // url hash to load time
this . idCache = new ConcurrentARC < String , Metadata > ( idCacheCapacity , concurrency ) ; // url hash to load time
this . updateQueue = new ArrayBlockingQueue < SolrInputDocument > ( updateCapacity ) ;
this . deleteQueue = new LinkedBlockingQueue < String > ( ) ;
this . deletionHandler = null ;
@ -192,16 +192,18 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
return null ;
}
private long existIdFromUpdateQueue ( String id ) {
if ( this . updateQueue . size ( ) = = 0 ) return - 1 ;
private Metadata existIdFromUpdateQueue ( String id ) {
if ( this . updateQueue . size ( ) = = 0 ) return null ;
Iterator < SolrInputDocument > i = this . updateQueue . iterator ( ) ;
while ( i . hasNext ( ) ) {
SolrInputDocument doc = i . next ( ) ;
if ( doc = = null ) break ;
String docID = ( String ) doc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
if ( docID ! = null & & docID . equals ( id ) ) return AbstractSolrConnector . getLoadDate ( doc ) ;
if ( docID ! = null & & docID . equals ( id ) ) {
return AbstractSolrConnector . getMetadata ( doc ) ;
}
}
return - 1 ;
return null ;
}
private void removeIdFromUpdateQueue ( String id ) {
@ -231,10 +233,10 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
}
}
private void updateIdCache ( String id , long time ) {
private void updateIdCache ( final String id , final Metadata md ) {
if ( id = = null ) return ;
if ( MemoryControl . shortStatus ( ) ) this . idCache . clear ( ) ;
this . idCache . put ( id , time ) ;
this . idCache . put ( id , md ) ;
}
public void ensureAliveDeletionHandler ( ) {
@ -361,25 +363,23 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
}
@Override
public long getLoadTime ( String id ) throws IOException {
Long date = this . idCache . get ( id ) ;
if ( date ! = null ) { cacheSuccessSign ( ) ; return date . longValue ( ) ; }
if ( existIdFromDeleteQueue ( id ) ) { cacheSuccessSign ( ) ; return - 1 ; }
long d = existIdFromUpdateQueue ( id ) ;
if ( d > = 0 ) { cacheSuccessSign ( ) ; return d ; }
d = this . connector . getLoadTime ( id ) ;
if ( d > = 0 ) {
updateIdCache ( id , d ) ;
return d ;
}
return - 1 ;
public Metadata getMetadata ( String id ) throws IOException {
Metadata md = this . idCache . get ( id ) ;
if ( md ! = null ) { cacheSuccessSign ( ) ; return md ; }
if ( existIdFromDeleteQueue ( id ) ) { cacheSuccessSign ( ) ; return null ; }
md = existIdFromUpdateQueue ( id ) ;
if ( md ! = null ) { cacheSuccessSign ( ) ; return md ; }
md = this . connector . getMetadata ( id ) ;
if ( md = = null ) return null ;
updateIdCache ( id , md ) ;
return md ;
}
@Override
public void add ( SolrInputDocument solrdoc ) throws IOException , SolrException {
String id = ( String ) solrdoc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
removeIdFromDeleteQueue ( id ) ;
updateIdCache ( id , AbstractSolrConnector . get LoadDate ( solrdoc ) ) ;
updateIdCache ( id , AbstractSolrConnector . get Metadata ( solrdoc ) ) ;
if ( this . updateHandler . isAlive ( ) ) {
try { this . updateQueue . put ( solrdoc ) ; } catch ( final InterruptedException e ) { }
} else {
@ -392,7 +392,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
for ( SolrInputDocument doc : solrdocs ) {
String id = ( String ) doc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
removeIdFromDeleteQueue ( id ) ;
updateIdCache ( id , AbstractSolrConnector . get LoadDate ( doc ) ) ;
updateIdCache ( id , AbstractSolrConnector . get Metadata ( doc ) ) ;
}
if ( this . updateHandler . isAlive ( ) ) {
for ( SolrInputDocument doc : solrdocs ) try { this . updateQueue . put ( doc ) ; } catch ( final InterruptedException e ) { }
@ -407,7 +407,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
SolrInputDocument idoc = getFromUpdateQueue ( id ) ;
if ( idoc ! = null ) { cacheSuccessSign ( ) ; return ClientUtils . toSolrDocument ( idoc ) ; }
SolrDocument doc = this . connector . getDocumentById ( id , AbstractSolrConnector . ensureEssentialFieldsIncluded ( fields ) ) ;
if ( doc ! = null ) updateIdCache ( id , AbstractSolrConnector . get LoadDate ( doc ) ) ;
if ( doc ! = null ) updateIdCache ( id , AbstractSolrConnector . get Metadata ( doc ) ) ;
return doc ;
}