@ -1252,166 +1252,180 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
// process all documents in collection
final Map < String , Long > hostExtentCache = new HashMap < String , Long > ( ) ; // a mapping from the host id to the number of documents which contain this host-id
final Set < String > uniqueURLs = new ConcurrentHashSet < String > ( ) ; // will be used in a concurrent environment
final Set < String > omitFields = new HashSet < String > ( ) ;
omitFields . add ( CollectionSchema . process_sxt . getSolrFieldName ( ) ) ;
omitFields . add ( CollectionSchema . harvestkey_s . getSolrFieldName ( ) ) ;
final Collection < String > failids = new ArrayList < String > ( ) ;
final AtomicInteger countcheck = new AtomicInteger ( 0 ) ;
final AtomicInteger proccount = new AtomicInteger ( ) ;
final AtomicInteger proccount_referencechange = new AtomicInteger ( ) ;
final AtomicInteger proccount_citationchange = new AtomicInteger ( ) ;
try {
final Set < String > omitFields = new HashSet < String > ( ) ;
omitFields . add ( CollectionSchema . process_sxt . getSolrFieldName ( ) ) ;
omitFields . add ( CollectionSchema . harvestkey_s . getSolrFieldName ( ) ) ;
// partitioning of the index, get a facet for a partitioning key
final long count = collectionConnector . getCountByQuery ( collection1query ) ;
final long start = System . currentTimeMillis ( ) ;
final int concurrency = Math . max ( 1 , Math . min ( ( int ) ( MemoryControl . available ( ) / ( 100L * 1024L * 1024L ) ) , Runtime . getRuntime ( ) . availableProcessors ( ) ) ) ;
//final int concurrency = 1;
final boolean reference_computation = this . contains ( CollectionSchema . references_i ) & &
this . contains ( CollectionSchema . references_internal_i ) & &
this . contains ( CollectionSchema . references_external_i ) & &
this . contains ( CollectionSchema . references_exthosts_i ) ;
postprocessingActivity = "collecting " + count + " documents from the collection for harvestkey " + harvestkey ;
ConcurrentLog . info ( "CollectionConfiguration" , postprocessingActivity ) ;
final BlockingQueue < SolrDocument > docs = collectionConnector . concurrentDocumentsByQuery (
collection1query ,
( this . contains ( CollectionSchema . http_unique_b ) | | this . contains ( CollectionSchema . www_unique_b ) ) ?
CollectionSchema . host_subdomain_s . getSolrFieldName ( ) + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false
CollectionSchema . url_protocol_s . getSolrFieldName ( ) + " asc" // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false
: null , // null sort is faster!
0 , 100000000 , Long . MAX_VALUE , concurrency + 1 , concurrency , true ,
byPartialUpdate ?
new String [ ] {
// the following fields are needed to perform the postprocessing
// and should only be used for partial updates; for full updates use a
// full list of fields to avoid LazyInstantiation which has poor performace
CollectionSchema . id . getSolrFieldName ( ) ,
CollectionSchema . sku . getSolrFieldName ( ) ,
CollectionSchema . harvestkey_s . getSolrFieldName ( ) ,
CollectionSchema . process_sxt . getSolrFieldName ( ) ,
CollectionSchema . canonical_equal_sku_b . getSolrFieldName ( ) ,
CollectionSchema . canonical_s . getSolrFieldName ( ) ,
CollectionSchema . exact_signature_l . getSolrFieldName ( ) ,
CollectionSchema . fuzzy_signature_l . getSolrFieldName ( ) ,
CollectionSchema . title_exact_signature_l . getSolrFieldName ( ) ,
CollectionSchema . description_exact_signature_l . getSolrFieldName ( ) ,
CollectionSchema . host_id_s . getSolrFieldName ( ) ,
CollectionSchema . host_s . getSolrFieldName ( ) ,
CollectionSchema . host_subdomain_s . getSolrFieldName ( ) ,
CollectionSchema . url_chars_i . getSolrFieldName ( ) ,
CollectionSchema . url_protocol_s . getSolrFieldName ( ) ,
CollectionSchema . httpstatus_i . getSolrFieldName ( ) ,
CollectionSchema . inboundlinkscount_i . getSolrFieldName ( ) ,
CollectionSchema . robots_i . getSolrFieldName ( ) } :
this . allFields ( ) ) ;
final AtomicInteger proccount = new AtomicInteger ( ) ;
final AtomicInteger proccount_referencechange = new AtomicInteger ( ) ;
final AtomicInteger proccount_citationchange = new AtomicInteger ( ) ;
final AtomicInteger countcheck = new AtomicInteger ( 0 ) ;
final Collection < String > failids = new ArrayList < String > ( ) ;
final Thread rewriteThread [ ] = new Thread [ concurrency ] ;
for ( int rewrite_start = 0 ; rewrite_start < concurrency ; rewrite_start + + ) {
rewriteThread [ rewrite_start ] = new Thread ( ) {
@Override
public void run ( ) {
SolrDocument doc ;
try {
while ( ( doc = docs . take ( ) ) ! = AbstractSolrConnector . POISON_DOCUMENT ) {
// for each to-be-processed entry work on the process tag
Collection < Object > proctags = doc . getFieldValues ( CollectionSchema . process_sxt . getSolrFieldName ( ) ) ;
final String u = ( String ) doc . getFieldValue ( CollectionSchema . sku . getSolrFieldName ( ) ) ;
final String i = ( String ) doc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
if ( proctags = = null | | proctags . size ( ) = = 0 ) {
// this should not happen since we collected the documents using a process_sxt:[* TO *] term
ConcurrentLog . warn ( "CollectionConfiguration" , "no process_sxt entry for url " + u + ", id=" + i ) ;
continue ;
}
try {
DigestURL url = new DigestURL ( u , ASCII . getBytes ( i ) ) ;
byte [ ] id = url . hash ( ) ;
SolrInputDocument sid = byPartialUpdate ? new SolrInputDocument ( ) : collection . toSolrInputDocument ( doc , omitFields ) ;
sid . setField ( CollectionSchema . id . getSolrFieldName ( ) , i ) ;
for ( Object tag : proctags ) try {
String partitioningKey = CollectionSchema . responsetime_i . getSolrFieldName ( ) ;
Map < String , ReversibleScoreMap < String > > partitioningFacet = collectionConnector . getFacets ( collection1query , 100000 , partitioningKey ) ;
ReversibleScoreMap < String > partitioning = partitioningFacet . get ( partitioningKey ) ;
long emptyCount = collectionConnector . getCountByQuery ( partitioningKey + ":\"\" AND (" + collection1query + ")" ) ;
if ( emptyCount > 0 ) partitioning . inc ( "" , ( int ) emptyCount ) ;
for ( String partitioningValue : partitioning ) {
String partitioningQuery = partitioningKey + ":\"" + partitioningValue + "\" AND (" + collection1query + ")" ;
postprocessingActivity = "collecting " + partitioning . get ( partitioningValue ) + " documents from partition \"" + partitioningValue + "\" (averall " + count + ") from the collection for harvestkey " + harvestkey + ", partitioned by " + partitioningKey ;
// start collection of documents
final long start = System . currentTimeMillis ( ) ;
final int concurrency = Math . max ( 1 , Math . min ( ( int ) ( MemoryControl . available ( ) / ( 100L * 1024L * 1024L ) ) , Runtime . getRuntime ( ) . availableProcessors ( ) ) ) ;
//final int concurrency = 1;
final boolean reference_computation = this . contains ( CollectionSchema . references_i ) & &
this . contains ( CollectionSchema . references_internal_i ) & &
this . contains ( CollectionSchema . references_external_i ) & &
this . contains ( CollectionSchema . references_exthosts_i ) ;
ConcurrentLog . info ( "CollectionConfiguration" , postprocessingActivity ) ;
final BlockingQueue < SolrDocument > docs = collectionConnector . concurrentDocumentsByQuery (
partitioningQuery ,
( this . contains ( CollectionSchema . http_unique_b ) | | this . contains ( CollectionSchema . www_unique_b ) ) ?
CollectionSchema . host_subdomain_s . getSolrFieldName ( ) + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false
CollectionSchema . url_protocol_s . getSolrFieldName ( ) + " asc" // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false
: null , // null sort is faster!
0 , 100000000 , Long . MAX_VALUE , concurrency + 1 , concurrency , true ,
byPartialUpdate ?
new String [ ] {
// the following fields are needed to perform the postprocessing
// and should only be used for partial updates; for full updates use a
// full list of fields to avoid LazyInstantiation which has poor performace
CollectionSchema . id . getSolrFieldName ( ) ,
CollectionSchema . sku . getSolrFieldName ( ) ,
CollectionSchema . harvestkey_s . getSolrFieldName ( ) ,
CollectionSchema . process_sxt . getSolrFieldName ( ) ,
CollectionSchema . canonical_equal_sku_b . getSolrFieldName ( ) ,
CollectionSchema . canonical_s . getSolrFieldName ( ) ,
CollectionSchema . exact_signature_l . getSolrFieldName ( ) ,
CollectionSchema . fuzzy_signature_l . getSolrFieldName ( ) ,
CollectionSchema . title_exact_signature_l . getSolrFieldName ( ) ,
CollectionSchema . description_exact_signature_l . getSolrFieldName ( ) ,
CollectionSchema . host_id_s . getSolrFieldName ( ) ,
CollectionSchema . host_s . getSolrFieldName ( ) ,
CollectionSchema . host_subdomain_s . getSolrFieldName ( ) ,
CollectionSchema . url_chars_i . getSolrFieldName ( ) ,
CollectionSchema . url_protocol_s . getSolrFieldName ( ) ,
CollectionSchema . httpstatus_i . getSolrFieldName ( ) ,
CollectionSchema . inboundlinkscount_i . getSolrFieldName ( ) ,
CollectionSchema . robots_i . getSolrFieldName ( ) } :
this . allFields ( ) ) ;
final Thread rewriteThread [ ] = new Thread [ concurrency ] ;
for ( int rewrite_start = 0 ; rewrite_start < concurrency ; rewrite_start + + ) {
rewriteThread [ rewrite_start ] = new Thread ( ) {
@Override
public void run ( ) {
SolrDocument doc ;
try {
while ( ( doc = docs . take ( ) ) ! = AbstractSolrConnector . POISON_DOCUMENT ) {
// for each to-be-processed entry work on the process tag
Collection < Object > proctags = doc . getFieldValues ( CollectionSchema . process_sxt . getSolrFieldName ( ) ) ;
final String u = ( String ) doc . getFieldValue ( CollectionSchema . sku . getSolrFieldName ( ) ) ;
final String i = ( String ) doc . getFieldValue ( CollectionSchema . id . getSolrFieldName ( ) ) ;
if ( proctags = = null | | proctags . size ( ) = = 0 ) {
// this should not happen since we collected the documents using a process_sxt:[* TO *] term
ConcurrentLog . warn ( "CollectionConfiguration" , "no process_sxt entry for url " + u + ", id=" + i ) ;
continue ;
}
try {
DigestURL url = new DigestURL ( u , ASCII . getBytes ( i ) ) ;
byte [ ] id = url . hash ( ) ;
SolrInputDocument sid = byPartialUpdate ? new SolrInputDocument ( ) : collection . toSolrInputDocument ( doc , omitFields ) ;
sid . setField ( CollectionSchema . id . getSolrFieldName ( ) , i ) ;
for ( Object tag : proctags ) try {
// switch over tag types
ProcessType tagtype = ProcessType . valueOf ( ( String ) tag ) ;
if ( tagtype = = ProcessType . CITATION & &
collection . contains ( CollectionSchema . cr_host_count_i ) & &
collection . contains ( CollectionSchema . cr_host_chance_d ) & &
collection . contains ( CollectionSchema . cr_host_norm_i ) ) {
CRV crv = rankings . remove ( ASCII . String ( id ) ) ; // instead of 'get'ting the CRV, we also remove it because we will not need it again and free some memory here
if ( crv ! = null ) {
sid . setField ( CollectionSchema . cr_host_count_i . getSolrFieldName ( ) , crv . count ) ;
sid . setField ( CollectionSchema . cr_host_chance_d . getSolrFieldName ( ) , crv . cr ) ;
sid . setField ( CollectionSchema . cr_host_norm_i . getSolrFieldName ( ) , crv . crn ) ;
proccount_citationchange . incrementAndGet ( ) ;
}
}
if ( tagtype = = ProcessType . UNIQUE ) {
postprocessing_http_unique ( segment , doc , sid , url ) ;
postprocessing_www_unique ( segment , doc , sid , url ) ;
postprocessing_doublecontent ( segment , uniqueURLs , doc , sid , url ) ;
}
} catch ( IllegalArgumentException e ) { }
// switch over tag types
ProcessType tagtype = ProcessType . valueOf ( ( String ) tag ) ;
if ( tagtype = = ProcessType . CITATION & &
collection . contains ( CollectionSchema . cr_host_count_i ) & &
collection . contains ( CollectionSchema . cr_host_chance_d ) & &
collection . contains ( CollectionSchema . cr_host_norm_i ) ) {
CRV crv = rankings . remove ( ASCII . String ( id ) ) ; // instead of 'get'ting the CRV, we also remove it because we will not need it again and free some memory here
if ( crv ! = null ) {
sid . setField ( CollectionSchema . cr_host_count_i . getSolrFieldName ( ) , crv . count ) ;
sid . setField ( CollectionSchema . cr_host_chance_d . getSolrFieldName ( ) , crv . cr ) ;
sid . setField ( CollectionSchema . cr_host_norm_i . getSolrFieldName ( ) , crv . crn ) ;
proccount_citationchange . incrementAndGet ( ) ;
// compute references
if ( reference_computation ) {
String hosthash = url . hosthash ( ) ;
if ( ! hostExtentCache . containsKey ( hosthash ) ) {
StringBuilder q = new StringBuilder ( ) ;
q . append ( CollectionSchema . host_id_s . getSolrFieldName ( ) ) . append ( ":\"" ) . append ( hosthash ) . append ( "\" AND " ) . append ( CollectionSchema . httpstatus_i . getSolrFieldName ( ) ) . append ( ":200" ) ;
long hostExtentCount = segment . fulltext ( ) . getDefaultConnector ( ) . getCountByQuery ( q . toString ( ) ) ;
hostExtentCache . put ( hosthash , hostExtentCount ) ;
}
if ( postprocessing_references ( rrCache , sid , url , hostExtentCache ) ) proccount_referencechange . incrementAndGet ( ) ;
}
if ( tagtype = = ProcessType . UNIQUE ) {
postprocessing_http_unique ( segment , doc , sid , url ) ;
postprocessing_www_unique ( segment , doc , sid , url ) ;
postprocessing_doublecontent ( segment , uniqueURLs , doc , sid , url ) ;
// all processing steps checked, remove the processing and harvesting key
if ( byPartialUpdate ) {
sid . setField ( CollectionSchema . process_sxt . getSolrFieldName ( ) , null ) ; // setting this to null will cause a removal when doing a partial update
sid . setField ( CollectionSchema . harvestkey_s . getSolrFieldName ( ) , null ) ;
} else {
sid . removeField ( CollectionSchema . process_sxt . getSolrFieldName ( ) ) ;
sid . removeField ( CollectionSchema . harvestkey_s . getSolrFieldName ( ) ) ;
}
// with standard solr fields selected, the sid now contains the fields
// id, http_unique_b, www_unique_b, references_i, references_internal_i, references_external_i, references_exthosts_i, host_extent_i
// and the value for host_extent_i is by default 2147483647
} catch ( IllegalArgumentException e ) { }
// compute references
if ( reference_computation ) {
String hosthash = url . hosthash ( ) ;
if ( ! hostExtentCache . containsKey ( hosthash ) ) {
StringBuilder q = new StringBuilder ( ) ;
q . append ( CollectionSchema . host_id_s . getSolrFieldName ( ) ) . append ( ":\"" ) . append ( hosthash ) . append ( "\" AND " ) . append ( CollectionSchema . httpstatus_i . getSolrFieldName ( ) ) . append ( ":200" ) ;
long hostExtentCount = segment . fulltext ( ) . getDefaultConnector ( ) . getCountByQuery ( q . toString ( ) ) ;
hostExtentCache . put ( hosthash , hostExtentCount ) ;
// send back to index
//collectionConnector.deleteById(i);
if ( byPartialUpdate ) {
collectionConnector . update ( sid ) ;
} else {
collectionConnector . add ( sid ) ;
}
if ( postprocessing_references ( rrCache , sid , url , hostExtentCache ) ) proccount_referencechange . incrementAndGet ( ) ;
}
// all processing steps checked, remove the processing and harvesting key
if ( byPartialUpdate ) {
sid . setField ( CollectionSchema . process_sxt . getSolrFieldName ( ) , null ) ; // setting this to null will cause a removal when doing a partial update
sid . setField ( CollectionSchema . harvestkey_s . getSolrFieldName ( ) , null ) ;
} else {
sid . removeField ( CollectionSchema . process_sxt . getSolrFieldName ( ) ) ;
sid . removeField ( CollectionSchema . harvestkey_s . getSolrFieldName ( ) ) ;
}
// with standard solr fields selected, the sid now contains the fields
// id, http_unique_b, www_unique_b, references_i, references_internal_i, references_external_i, references_exthosts_i, host_extent_i
// and the value for host_extent_i is by default 2147483647
// send back to index
//collectionConnector.deleteById(i);
if ( byPartialUpdate ) {
collectionConnector . update ( sid ) ;
} else {
collectionConnector . add ( sid ) ;
}
long thiscount = proccount . incrementAndGet ( ) ; allcount . incrementAndGet ( ) ;
if ( thiscount % 100 = = 0 ) {
postprocessingActivity = "postprocessed " + thiscount + " from " + count + " collection documents; " +
( thiscount * 60000L / ( System . currentTimeMillis ( ) - start ) ) + " ppm; " +
( ( System . currentTimeMillis ( ) - start ) * ( count - thiscount ) / thiscount / 60000 ) + " minutes remaining" ;
ConcurrentLog . info ( "CollectionConfiguration" , postprocessingActivity ) ;
long thiscount = proccount . incrementAndGet ( ) ; allcount . incrementAndGet ( ) ;
if ( thiscount % 100 = = 0 ) {
postprocessingActivity = "postprocessed " + thiscount + " from " + count + " collection documents; " +
( thiscount * 60000L / ( System . currentTimeMillis ( ) - start ) ) + " ppm; " +
( ( System . currentTimeMillis ( ) - start ) * ( count - thiscount ) / thiscount / 60000 ) + " minutes remaining" ;
ConcurrentLog . info ( "CollectionConfiguration" , postprocessingActivity ) ;
}
} catch ( final Throwable e1 ) {
ConcurrentLog . logException ( e1 ) ;
failids . add ( i ) ;
}
} catch ( final Throwable e1 ) {
ConcurrentLog . logException ( e1 ) ;
failids . add ( i ) ;
countcheck . incrementAndGet ( ) ;
}
countcheck . incrementAndGet ( ) ;
} catch ( InterruptedException e ) {
ConcurrentLog . logException ( e ) ;
}
} catch ( InterruptedException e ) {
ConcurrentLog . logException ( e ) ;
}
}
} ;
rewriteThread [ rewrite_start ] . start ( ) ;
} ;
rewriteThread [ rewrite_start ] . start ( ) ;
}
// wait for termination
for ( int rewrite_start = 0 ; rewrite_start < concurrency ; rewrite_start + + ) rewriteThread [ rewrite_start ] . join ( ) ;
}
// wait for termination
for ( int rewrite_start = 0 ; rewrite_start < concurrency ; rewrite_start + + ) rewriteThread [ rewrite_start ] . join ( ) ;
if ( failids . size ( ) > 0 ) {
ConcurrentLog . info ( "CollectionConfiguration" , "cleanup_processing: deleting " + failids . size ( ) + " documents which have permanent execution fails" ) ;
collectionConnector . deleteByIds ( failids ) ;
}
if ( count ! = countcheck . get ( ) ) ConcurrentLog . warn ( "CollectionConfiguration" , "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck ) ; // big gap for harvestkey = null
if ( count ! = countcheck . get ( ) ) ConcurrentLog . warn ( "CollectionConfiguration" , "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck + "; countquery=" + collection1query ) ; // big gap for harvestkey = null
ConcurrentLog . info ( "CollectionConfiguration" , "cleanup_processing: re-calculated " + proccount + " new documents, " +
proccount_referencechange + " reference-count changes, " +
proccount_citationchange + " citation ranking changes." ) ;
} catch ( final InterruptedException e2 ) {
ConcurrentLog . warn ( "CollectionConfiguration" , e2 . getMessage ( ) , e2 ) ;
} catch ( IOException e3 ) {