@ -898,17 +898,19 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
String query = ( harvestkey = = null | | ! segment . fulltext ( ) . getDefaultConfiguration ( ) . contains ( CollectionSchema . harvestkey_s ) ? "" : CollectionSchema . harvestkey_s . getSolrFieldName ( ) + ":\"" + harvestkey + "\" AND " ) +
CollectionSchema . process_sxt . getSolrFieldName ( ) + ":" + ProcessType . CITATION . toString ( ) ;
hostscore = collectionConnector . getFacets ( query , 10000000 , CollectionSchema . host_s . getSolrFieldName ( ) ) . get ( CollectionSchema . host_s . getSolrFieldName ( ) ) ;
if ( hostscore = = null ) hostscore = new ClusteredScoreMap < String > ( ) ;
ConcurrentLog . info ( "CollectionConfiguration" , "collecting " + hostscore . size ( ) + " hosts" ) ;
int countcheck = 0 ;
for ( String host : hostscore . keyList ( true ) ) {
// Patch the citation index for links with canonical tags.
// This shall fulfill the following requirement:
// If a document A links to B and B contains a 'canonical C', then the citation rank co putation shall consider that A links to C and B does not link to C.
// If a document A links to B and B contains a 'canonical C', then the citation rank co m putation shall consider that A links to C and B does not link to C.
// To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links
String patchquery = CollectionSchema . host_s . getSolrFieldName ( ) + ":" + host + " AND " + CollectionSchema . canonical_s . getSolrFieldName ( ) + ":[* TO *]" ;
long patchquerycount = collectionConnector . getCountByQuery ( patchquery ) ;
BlockingQueue < SolrDocument > documents_with_canonical_tag = collectionConnector . concurrentDocumentsByQuery ( patchquery , 0 , 10000000 , 60000L , 50 ,
CollectionSchema . id . getSolrFieldName ( ) , CollectionSchema . sku . getSolrFieldName ( ) , CollectionSchema . canonical_s . getSolrFieldName ( ) ) ;
SolrDocument doc_B ;
int patchquerycountcheck = 0 ;
try {
while ( ( doc_B = documents_with_canonical_tag . take ( ) ) ! = AbstractSolrConnector . POISON_DOCUMENT ) {
// find all documents which link to the canonical doc
@ -926,10 +928,12 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
CitationReference doc_A_citation = doc_A_ids_iterator . next ( ) ;
segment . urlCitation ( ) . add ( doc_C_url . hash ( ) , doc_A_citation ) ;
}
patchquerycountcheck + + ;
}
} catch ( InterruptedException e ) {
} catch ( SpaceExceededException e ) {
}
if ( patchquerycount ! = patchquerycountcheck ) ConcurrentLog . warn ( "CollectionConfiguration" , "ambiguous patchquery count for host " + host + ": expected=" + patchquerycount + ", counted=" + patchquerycountcheck ) ;
// do the citation rank computation
if ( hostscore . get ( host ) < = 0 ) continue ;
@ -939,12 +943,14 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
while ( convergence_attempts + + < 30 ) {
if ( crh . convergenceStep ( ) ) break ;
}
ConcurrentLog . info ( "CollectionConfiguration .CRHost ", "convergence for host " + host + " after " + convergence_attempts + " steps" ) ;
ConcurrentLog . info ( "CollectionConfiguration ", "convergence for host " + host + " after " + convergence_attempts + " steps" ) ;
// we have now the cr for all documents of a specific host; we store them for later use
Map < byte [ ] , CRV > crn = crh . normalize ( ) ;
//crh.log(crn);
ranking . putAll ( crn ) ; // accumulate this here for usage in document update later
countcheck + + ;
}
if ( hostscore . size ( ) ! = countcheck ) ConcurrentLog . warn ( "CollectionConfiguration" , "ambiguous host count: expected=" + hostscore . size ( ) + ", counted=" + countcheck ) ;
} catch ( final IOException e2 ) {
hostscore = new ClusteredScoreMap < String > ( ) ;
}
@ -952,13 +958,15 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
// process all documents at the webgraph for the outgoing links of this document
SolrDocument doc ;
if ( webgraphConnector ! = null ) {
for ( String host : hostscore . keyList ( true ) ) {
if ( hostscore . get ( host ) < = 0 ) continue ;
// select all webgraph edges and modify their cr value
BlockingQueue < SolrDocument > docs = webgraphConnector . concurrentDocumentsByQuery (
WebgraphSchema . source_host_s . getSolrFieldName ( ) + ":\"" + host + "\"" ,
0 , 10000000 , 60000 , 50 ) ;
try {
try {
for ( String host : hostscore . keyList ( true ) ) {
if ( hostscore . get ( host ) < = 0 ) continue ;
// select all webgraph edges and modify their cr value
String query = WebgraphSchema . source_host_s . getSolrFieldName ( ) + ":\"" + host + "\"" ;
long count = webgraphConnector . getCountByQuery ( query ) ;
ConcurrentLog . info ( "CollectionConfiguration" , "collecting " + count + " documents from the webgraph" ) ;
BlockingQueue < SolrDocument > docs = webgraphConnector . concurrentDocumentsByQuery ( query , 0 , 10000000 , 60000 , 50 ) ;
int countcheck = 0 ;
while ( ( doc = docs . take ( ) ) ! = AbstractSolrConnector . POISON_DOCUMENT ) {
boolean changed = false ;
SolrInputDocument sid = segment . fulltext ( ) . getWebgraphConfiguration ( ) . toSolrInputDocument ( doc , null ) ;
@ -978,21 +986,29 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
webgraphConnector . add ( sid ) ;
} catch ( SolrException e ) {
} catch ( IOException e ) {
}
}
countcheck + + ;
}
} catch ( final InterruptedException e ) { }
if ( count ! = countcheck ) ConcurrentLog . warn ( "CollectionConfiguration" , "ambiguous webgraph document count for host " + host + ": expected=" + count + ", counted=" + countcheck ) ;
}
} catch ( final IOException e2 ) {
ConcurrentLog . warn ( "CollectionConfiguration" , e2 . getMessage ( ) , e2 ) ;
} catch ( final InterruptedException e3 ) {
ConcurrentLog . warn ( "CollectionConfiguration" , e3 . getMessage ( ) , e3 ) ;
}
}
// process all documents in collection
BlockingQueue < SolrDocument > docs = collectionConnector . concurrentDocumentsByQuery (
( harvestkey = = null ? "" : CollectionSchema . harvestkey_s . getSolrFieldName ( ) + ":\"" + harvestkey + "\" AND " ) +
CollectionSchema . process_sxt . getSolrFieldName ( ) + ":[* TO *]" ,
0 , 10000 , 60000 , 50 ) ;
String query = ( harvestkey = = null ? "" : CollectionSchema . harvestkey_s . getSolrFieldName ( ) + ":\"" + harvestkey + "\" AND " ) +
CollectionSchema . process_sxt . getSolrFieldName ( ) + ":[* TO *]" ;
int proccount = 0 , proccount_clickdepthchange = 0 , proccount_referencechange = 0 , proccount_citationchange = 0 , proccount_uniquechange = 0 ;
Map < String , Long > hostExtentCache = new HashMap < String , Long > ( ) ; // a mapping from the host id to the number of documents which contain this host-id
Set < String > uniqueURLs = new HashSet < String > ( ) ;
try {
long count = collectionConnector . getCountByQuery ( query ) ;
ConcurrentLog . info ( "CollectionConfiguration" , "collecting " + count + " documents from the collection for harvestkey " + harvestkey ) ;
BlockingQueue < SolrDocument > docs = collectionConnector . concurrentDocumentsByQuery ( query , 0 , 10000 , 60000 , 50 ) ;
int countcheck = 0 ;
while ( ( doc = docs . take ( ) ) ! = AbstractSolrConnector . POISON_DOCUMENT ) {
// for each to-be-processed entry work on the process tag
Collection < Object > proctags = doc . getFieldValues ( CollectionSchema . process_sxt . getSolrFieldName ( ) ) ;
@ -1031,8 +1047,8 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
if ( ! hostExtentCache . containsKey ( hosthash ) ) {
StringBuilder q = new StringBuilder ( ) ;
q . append ( CollectionSchema . host_id_s . getSolrFieldName ( ) ) . append ( ":\"" ) . append ( hosthash ) . append ( "\" AND " ) . append ( CollectionSchema . httpstatus_i . getSolrFieldName ( ) ) . append ( ":200" ) ;
long c ount = segment . fulltext ( ) . getDefaultConnector ( ) . getCountByQuery ( q . toString ( ) ) ;
hostExtentCache . put ( hosthash , c ount) ;
long hostExtentC ount = segment . fulltext ( ) . getDefaultConnector ( ) . getCountByQuery ( q . toString ( ) ) ;
hostExtentCache . put ( hosthash , hostExtentC ount) ;
}
if ( postprocessing_references ( rrCache , doc , sid , url , hostExtentCache ) ) proccount_referencechange + + ;
@ -1047,13 +1063,18 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
proccount + + ;
} catch ( final Throwable e1 ) {
}
countcheck + + ;
}
if ( count ! = countcheck ) ConcurrentLog . warn ( "CollectionConfiguration" , "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck ) ;
ConcurrentLog . info ( "CollectionConfiguration" , "cleanup_processing: re-calculated " + proccount + " new documents, " +
proccount_clickdepthchange + " clickdepth changes, " +
proccount_referencechange + " reference-count changes, " +
proccount_uniquechange + " unique field changes, " +
proccount_citationchange + " citation ranking changes." ) ;
} catch ( final InterruptedException e ) {
} catch ( final InterruptedException e2 ) {
ConcurrentLog . warn ( "CollectionConfiguration" , e2 . getMessage ( ) , e2 ) ;
} catch ( IOException e3 ) {
ConcurrentLog . warn ( "CollectionConfiguration" , e3 . getMessage ( ) , e3 ) ;
}
return proccount ;
}
@ -1148,8 +1169,8 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
if ( entry = = null | | entry . getValue ( ) = = null ) continue ;
try {
String url = ( String ) connector . getDocumentById ( ASCII . String ( entry . getKey ( ) ) , CollectionSchema . sku . getSolrFieldName ( ) ) . getFieldValue ( CollectionSchema . sku . getSolrFieldName ( ) ) ;
ConcurrentLog . info ( "CollectionConfiguration .CRHost ", "CR for " + url ) ;
ConcurrentLog . info ( "CollectionConfiguration .CRHost ", ">> " + entry . getValue ( ) . toString ( ) ) ;
ConcurrentLog . info ( "CollectionConfiguration ", "CR for " + url ) ;
ConcurrentLog . info ( "CollectionConfiguration ", ">> " + entry . getValue ( ) . toString ( ) ) ;
} catch ( final IOException e ) {
ConcurrentLog . logException ( e ) ;
}