@ -51,6 +51,7 @@ import net.yacy.cora.document.id.DigestURL;
import net.yacy.cora.document.id.MultiProtocolURL ;
import net.yacy.cora.federate.solr.connector.AbstractSolrConnector ;
import net.yacy.cora.federate.solr.connector.SolrConnector ;
import net.yacy.cora.federate.solr.connector.SolrConnector.LoadTimeURL ;
import net.yacy.cora.federate.yacy.CacheStrategy ;
import net.yacy.cora.order.Base64Order ;
import net.yacy.cora.order.ByteOrder ;
@ -110,8 +111,9 @@ public class Segment {
public static final long targetFileSize = 64 * 1024 * 1024 ; // 256 MB
public static final int writeBufferSize = 4 * 1024 * 1024 ;
public static final String termIndexName = "text.index" ;
public static final String citationIndexName = "citation.index" ;
public static final String citationIndexName = "citation.index" ;
public static final String firstseenIndexName = "firstseen.index" ;
public static final String loadtimeIndexName = "loadtime.index" ;
// the reference factory
public static final ReferenceFactory < WordReference > wordReferenceFactory = new WordReferenceFactory ( ) ;
@ -122,9 +124,10 @@ public class Segment {
private final File segmentPath ;
protected final Fulltext fulltext ;
protected IndexCell < WordReference > termIndex ;
protected IndexCell < CitationReference > urlCitationIndex ;
protected IndexTable firstSeenIndex ;
protected IODispatcher merger = null ; // shared iodispatcher for kelondro indexes
private IndexCell < CitationReference > urlCitationIndex ;
private IndexTable firstSeenIndex ;
private IndexTable loadTimeIndex ;
private IODispatcher merger = null ; // shared iodispatcher for kelondro indexes
/ * *
* create a new Segment
@ -138,20 +141,21 @@ public class Segment {
this . log = log ;
this . segmentPath = segmentPath ;
archivePath . mkdirs ( ) ;
this . fulltext = new Fulltext ( segmentPath , archivePath , collectionConfiguration , webgraphConfiguration ) ;
this . termIndex = null ;
this . urlCitationIndex = null ;
this . firstSeenIndex = new IndexTable ( new File ( segmentPath , firstseenIndexName ) , 12 , 8 , false , false ) ;
this . loadTimeIndex = new IndexTable ( new File ( segmentPath , loadtimeIndexName ) , 12 , 8 , false , false ) ;
}
public boolean connectedRWI ( ) {
return this . termIndex ! = null ;
}
public void connectRWI ( final int entityCacheMaxSize , final long maxFileSize ) throws IOException {
if ( this . termIndex ! = null ) return ;
if ( this . merger = = null ) { // init shared iodispatcher if none running
this . merger = new IODispatcher ( 2 , 2 , writeBufferSize ) ;
this . merger . start ( ) ;
@ -166,7 +170,7 @@ public class Segment {
targetFileSize ,
maxFileSize ,
writeBufferSize ,
merger) ;
this . merger) ;
}
public void disconnectRWI ( ) {
@ -196,7 +200,7 @@ public class Segment {
targetFileSize ,
maxFileSize ,
writeBufferSize ,
merger) ;
this . merger) ;
}
public void disconnectCitation ( ) {
@ -208,7 +212,7 @@ public class Segment {
public int citationCount ( ) {
return this . urlCitationIndex = = null ? 0 : this . urlCitationIndex . sizesMax ( ) ;
}
public long citationSegmentCount ( ) {
return this . urlCitationIndex = = null ? 0 : this . urlCitationIndex . getSegmentCount ( ) ;
}
@ -224,27 +228,31 @@ public class Segment {
public IndexCell < CitationReference > urlCitation ( ) {
return this . urlCitationIndex ;
}
public IndexTable firstSeen ( ) {
public IndexTable firstSeen Index ( ) {
return this . firstSeenIndex ;
}
public IndexTable loadTimeIndex ( ) {
return this . loadTimeIndex ;
}
public ReferenceReportCache getReferenceReportCache ( ) {
return new ReferenceReportCache ( ) ;
}
public class ReferenceReportCache {
private final Map < String , ReferenceReport > cache ;
public ReferenceReportCache ( ) {
this . cache = new ConcurrentHashMap < String , ReferenceReport > ( ) ;
}
public ReferenceReport getReferenceReport ( final String id , final boolean acceptSelfReference ) throws IOException {
ReferenceReport rr = cache. get ( id ) ;
if ( MemoryControl . shortStatus ( ) ) cache. clear ( ) ;
ReferenceReport rr = this . cache. get ( id ) ;
if ( MemoryControl . shortStatus ( ) ) this . cache. clear ( ) ;
if ( rr ! = null ) return rr ;
try {
rr = new ReferenceReport ( ASCII . getBytes ( id ) , acceptSelfReference ) ;
cache. put ( id , rr ) ;
this . cache. put ( id , rr ) ;
return rr ;
} catch ( final SpaceExceededException e ) {
ConcurrentLog . logException ( e ) ;
@ -252,7 +260,7 @@ public class Segment {
}
}
}
/ * *
* A ReferenceReport object is a container for all references to a specific url .
* The class stores the number of links from domain - internal and domain - external backlinks ,
@ -278,19 +286,19 @@ public class Segment {
CitationReference ref = ri . next ( ) ;
byte [ ] hh = ref . hosthash ( ) ; // host hash
if ( ByteBuffer . equals ( hh , 0 , id , 6 , 6 ) ) {
internalIDs. put ( ref . urlhash ( ) ) ;
internal+ + ;
this . internalIDs. put ( ref . urlhash ( ) ) ;
this . internal+ + ;
} else {
externalHosts. put ( hh ) ;
externalIDs. put ( ref . urlhash ( ) ) ;
external+ + ;
this . externalHosts. put ( hh ) ;
this . externalIDs. put ( ref . urlhash ( ) ) ;
this . external+ + ;
}
}
} catch ( SpaceExceededException e ) {
// the Citation Index got too large, we ignore the problem and hope that a second solr index is attached which will take over now
if ( Segment . this . fulltext . useWebgraph ( ) ) internalIDs. clear ( ) ;
if ( Segment . this . fulltext . useWebgraph ( ) ) this . internalIDs. clear ( ) ;
}
if ( ( internalIDs. size ( ) = = 0 | | ! connectedCitation ( ) ) & & Segment . this . fulltext . useWebgraph ( ) ) {
if ( ( this . internalIDs. size ( ) = = 0 | | ! connectedCitation ( ) ) & & Segment . this . fulltext . useWebgraph ( ) ) {
// reqd the references from the webgraph
SolrConnector webgraph = Segment . this . fulltext . getWebgraphConnector ( ) ;
BlockingQueue < SolrDocument > docs = webgraph . concurrentDocumentsByQuery ( "{!cache=false raw f=" + WebgraphSchema . target_id_s . getSolrFieldName ( ) + "}" + ASCII . String ( id ) , WebgraphSchema . source_chars_i . getSolrFieldName ( ) + " asc" , 0 , 10000000 , Long . MAX_VALUE , 100 , 1 , false , WebgraphSchema . source_id_s . getSolrFieldName ( ) ) ;
@ -305,13 +313,13 @@ public class Segment {
System . arraycopy ( refidh , 6 , hh , 0 , 6 ) ;
if ( ByteBuffer . equals ( hh , 0 , id , 6 , 6 ) ) {
if ( acceptSelfReference | | ! Arrays . equals ( refidh , id ) ) {
internalIDs. put ( refidh ) ;
internal+ + ;
this . internalIDs. put ( refidh ) ;
this . internal+ + ;
}
} else {
externalHosts. put ( hh ) ;
externalIDs. put ( refidh ) ;
external+ + ;
this . externalHosts. put ( hh ) ;
this . externalIDs. put ( refidh ) ;
this . external+ + ;
}
}
} catch ( final InterruptedException e ) {
@ -338,12 +346,12 @@ public class Segment {
return this . internalIDs ;
}
}
public long RWICount ( ) {
if ( this . termIndex = = null ) return 0 ;
return this . termIndex . sizesMax ( ) ;
}
public long RWISegmentCount ( ) {
if ( this . termIndex = = null ) return 0 ;
return this . termIndex . getSegmentCount ( ) ;
@ -377,7 +385,7 @@ public class Segment {
return 0 ;
}
}
public void setFirstSeenTime ( final byte [ ] urlhash , long time ) {
if ( urlhash = = null | | time < = 0 ) return ;
try {
@ -387,7 +395,7 @@ public class Segment {
ConcurrentLog . logException ( e ) ;
}
}
public long getFirstSeenTime ( final byte [ ] urlhash ) {
if ( urlhash = = null ) return - 1 ;
try {
@ -397,7 +405,36 @@ public class Segment {
return - 1 ;
}
}
public void setLoadTime ( final byte [ ] urlhash , long time ) {
if ( urlhash = = null | | time < = 0 ) return ;
try {
this . loadTimeIndex . put ( urlhash , time ) ; // ALWAYS overwrite!
} catch ( IOException e ) {
ConcurrentLog . logException ( e ) ;
}
}
public long getLoadTime ( final byte [ ] urlhash ) {
if ( urlhash = = null ) return - 1 ;
try {
return this . loadTimeIndex . get ( urlhash ) ;
} catch ( IOException e ) {
ConcurrentLog . logException ( e ) ;
return - 1 ;
}
}
public LoadTimeURL getLoadTimeURL ( String url , byte [ ] urlhash ) {
long t = getLoadTime ( urlhash ) ;
if ( t < 0 ) return null ;
return new LoadTimeURL ( url , t ) ;
}
public LoadTimeURL getLoadTimeURL ( String url , String id ) {
return getLoadTimeURL ( url , id . getBytes ( ) ) ;
}
/ * *
* check if a given document , identified by url hash as document id exists
* @param id the url hash and document id
@ -467,7 +504,7 @@ public class Segment {
ConcurrentLog . logException ( e ) ;
}
}
public void clearCaches ( ) {
if ( this . urlCitationIndex ! = null ) this . urlCitationIndex . clearCache ( ) ;
if ( this . termIndex ! = null ) this . termIndex . clearCache ( ) ;
@ -483,6 +520,7 @@ public class Segment {
if ( this . fulltext ! = null ) this . fulltext . close ( ) ;
if ( this . urlCitationIndex ! = null ) this . urlCitationIndex . close ( ) ;
if ( this . firstSeenIndex ! = null ) this . firstSeenIndex . close ( ) ;
if ( this . loadTimeIndex ! = null ) this . loadTimeIndex . close ( ) ;
if ( this . merger ! = null ) {
this . merger . terminate ( ) ;
this . merger = null ;
@ -575,11 +613,11 @@ public class Segment {
) {
final CollectionConfiguration collectionConfig = this . fulltext . getDefaultConfiguration ( ) ;
final String language = votedLanguage ( url , url . toNormalform ( true ) , document , condenser ) ; // identification of the language
final CollectionConfiguration . SolrVector vector = collectionConfig . yacy2solr ( this , collections , responseHeader ,
document , condenser , referrerURL , language , crawlProfile . isPushCrawlProfile ( ) ,
this . fulltext ( ) . useWebgraph ( ) ? this . fulltext . getWebgraphConfiguration ( ) : null , sourceName ) ;
return storeDocument ( url , crawlProfile , responseHeader , document , vector , language , condenser ,
searchEvent , sourceName , storeToRWI , proxy , acceptLanguage ) ;
}
@ -599,10 +637,10 @@ public class Segment {
final String acceptLanguage
) {
final long startTime = System . currentTimeMillis ( ) ;
final CollectionConfiguration collectionConfig = this . fulltext . getDefaultConfiguration ( ) ;
final String urlNormalform = url . toNormalform ( true ) ;
// CREATE INDEX
// load some document metadata
final Date loadDate = new Date ( ) ;
@ -619,7 +657,7 @@ public class Segment {
// ENRICH DOCUMENT WITH RANKING INFORMATION
this . fulltext . getDefaultConfiguration ( ) . postprocessing_references ( this . getReferenceReportCache ( ) , vector , url , null ) ;
// CREATE SNAPSHOT
if ( ( url . getProtocol ( ) . equals ( "http" ) | | url . getProtocol ( ) . equals ( "https" ) ) & &
crawlProfile ! = null & & document . getDepth ( ) < = crawlProfile . snapshotMaxdepth ( ) & &
@ -634,13 +672,13 @@ public class Segment {
// STORE IMAGE AND METADATA
Transactions . store ( vector , true , crawlProfile . snapshotLoadImage ( ) , crawlProfile . snapshotReplaceold ( ) , proxy , acceptLanguage ) ;
}
// STORE TO SOLR
this . putDocument ( vector ) ;
List < SolrInputDocument > webgraph = vector . getWebgraphDocuments ( ) ;
String error = null ;
if ( webgraph ! = null & & webgraph . size ( ) > 0 ) {
// write the edges to the webgraph solr index
if ( this . fulltext . useWebgraph ( ) ) {
tryloop : for ( int i = 0 ; i < 20 ; i + + ) {
@ -657,16 +695,18 @@ public class Segment {
}
}
}
}
// REMEMBER FIRST SEEN
setFirstSeenTime ( url . hash ( ) , Math . min ( document . getLastModified ( ) . getTime ( ) , System . currentTimeMillis ( ) ) ) ; // should exist already in the index at this time, but just to make sure
long now = System . currentTimeMillis ( ) ;
setFirstSeenTime ( url . hash ( ) , Math . min ( document . getLastModified ( ) . getTime ( ) , now ) ) ; // should exist already in the index at this time, but just to make sure
setLoadTime ( url . hash ( ) , now ) ; // always overwrites index entry
// write the edges to the citation reference index
if ( this . connectedCitation ( ) ) try {
// we use the subgraph to write the citation index, that shall cause that the webgraph and the citation index is identical
if ( collectionConfig . contains ( CollectionSchema . inboundlinks_protocol_sxt ) | | collectionConfig . contains ( CollectionSchema . inboundlinks_urlstub_sxt ) ) {
Collection < Object > inboundlinks_urlstub = vector . getFieldValues ( CollectionSchema . inboundlinks_urlstub_sxt . getSolrFieldName ( ) ) ;
List < String > inboundlinks_protocol = inboundlinks_urlstub = = null ? null : CollectionConfiguration . indexedList2protocolList ( vector . getFieldValues ( CollectionSchema . inboundlinks_protocol_sxt . getSolrFieldName ( ) ) , inboundlinks_urlstub . size ( ) ) ;
@ -676,7 +716,7 @@ public class Segment {
String referrerhash = id ;
String anchorhash = ASCII . String ( new DigestURL ( targetURL ) . hash ( ) ) ;
if ( referrerhash ! = null & & anchorhash ! = null ) {
urlCitationIndex. add ( ASCII . getBytes ( anchorhash ) , new CitationReference ( ASCII . getBytes ( referrerhash ) , loadDate . getTime ( ) ) ) ;
this . urlCitationIndex. add ( ASCII . getBytes ( anchorhash ) , new CitationReference ( ASCII . getBytes ( referrerhash ) , loadDate . getTime ( ) ) ) ;
}
} catch ( Throwable e ) {
ConcurrentLog . logException ( e ) ;
@ -692,7 +732,7 @@ public class Segment {
String referrerhash = id ;
String anchorhash = ASCII . String ( new DigestURL ( targetURL ) . hash ( ) ) ;
if ( referrerhash ! = null & & anchorhash ! = null ) {
urlCitationIndex. add ( ASCII . getBytes ( anchorhash ) , new CitationReference ( ASCII . getBytes ( referrerhash ) , loadDate . getTime ( ) ) ) ;
this . urlCitationIndex. add ( ASCII . getBytes ( anchorhash ) , new CitationReference ( ASCII . getBytes ( referrerhash ) , loadDate . getTime ( ) ) ) ;
}
} catch ( Throwable e ) {
ConcurrentLog . logException ( e ) ;
@ -702,7 +742,7 @@ public class Segment {
} catch ( Throwable e ) {
ConcurrentLog . logException ( e ) ;
}
if ( error ! = null ) {
ConcurrentLog . severe ( "SOLR" , error + ", PLEASE REPORT TO https://github.com/yacy/yacy_search_server/issues" ) ;
//Switchboard.getSwitchboard().pauseCrawlJob(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL, error);
@ -728,7 +768,7 @@ public class Segment {
UTF8 . getBytes ( language ) ,
docType ,
outlinksSame , outlinksOther ) ;
// iterate over all words of content text
Word wprop = null ;
byte [ ] wordhash ;
@ -744,7 +784,7 @@ public class Segment {
} catch ( final Exception e ) {
ConcurrentLog . logException ( e ) ;
}
// during a search event it is possible that a heuristic is used which aquires index
// data during search-time. To transfer indexed data directly to the search process
// the following lines push the index data additionally to the search process
@ -762,7 +802,7 @@ public class Segment {
}
}
if ( searchEvent ! = null ) searchEvent . addFinalize ( ) ;
// assign the catchall word
ientry . setWord ( wprop = = null ? catchallWord : wprop ) ; // we use one of the word properties as template to get the document characteristics
if ( this . termIndex ! = null ) try { this . termIndex . add ( catchallHash , ientry ) ; } catch ( final Throwable e ) { ConcurrentLog . logException ( e ) ; }