@ -53,13 +53,15 @@ public class DocumentIndex extends Segment {
private static final RankingProfile textRankingDefault = new RankingProfile ( QueryParams . CONTENTDOM_TEXT ) ;
//private Bitfield zeroConstraint = new Bitfield(4);
File poison = new File ( "." ) ;
BlockingQueue < File > queue ;
Worker [ ] worker ;
private final static File poison = new File ( "." ) ;
private BlockingQueue < File > queue ;
private Worker [ ] worker ;
private CallbackListener callback ;
public DocumentIndex ( Log log , final File segmentPath , int cachesize ) throws IOException {
public DocumentIndex ( Log log , final File segmentPath , CallbackListener callback , int cachesize ) throws IOException {
super ( log , segmentPath , cachesize , targetFileSize * 4 - 1 , false , false ) ;
int cores = Runtime . getRuntime ( ) . availableProcessors ( ) + 1 ;
this . callback = callback ;
this . queue = new LinkedBlockingQueue < File > ( cores * 300 ) ;
this . worker = new Worker [ cores ] ;
for ( int i = 0 ; i < cores ; i + + ) {
@ -68,8 +70,8 @@ public class DocumentIndex extends Segment {
}
}
public DocumentIndex ( final File segmentPath , int cachesize ) throws IOException {
this ( new Log ( "DocumentIndex" ) , segmentPath , ca chesize) ;
public DocumentIndex ( final File segmentPath , CallbackListener callback , int cachesize ) throws IOException {
this ( new Log ( "DocumentIndex" ) , segmentPath , ca llback, ca chesize) ;
}
class Worker extends Thread {
@ -78,6 +80,7 @@ public class DocumentIndex extends Segment {
try {
while ( ( f = queue . take ( ) ) ! = poison ) try {
add ( f ) ;
if ( callback ! = null ) callback . commitIndex ( f ) ;
} catch ( IOException e ) {
if ( e . getMessage ( ) . indexOf ( "cannot parse" ) < 0 ) e . printStackTrace ( ) ;
}
@ -85,6 +88,17 @@ public class DocumentIndex extends Segment {
}
}
/ * *
* get the number of pending documents in the indexing queue
* /
public int pending ( ) {
return this . queue . size ( ) ;
}
public void clearQueue ( ) {
this . queue . clear ( ) ;
}
/ * *
* put a single file into the index
* @param file
@ -214,6 +228,10 @@ public class DocumentIndex extends Segment {
super . close ( ) ;
}
public interface CallbackListener {
public void commitIndex ( File f ) ;
}
public static void main ( String [ ] args ) {
// first argument: path to segment
// second argument: either 'add' or 'search'
@ -226,17 +244,22 @@ public class DocumentIndex extends Segment {
if ( args . length < 3 ) return ;
File segmentPath = new File ( args [ 0 ] ) ;
System . out . println ( "using index files at " + segmentPath . getAbsolutePath ( ) ) ;
CallbackListener callback = new CallbackListener ( ) {
public void commitIndex ( File f ) {
System . out . println ( "indexed: " + f . toString ( ) ) ;
}
} ;
try {
if ( args [ 1 ] . equals ( "add" ) ) {
File f = new File ( args [ 2 ] ) ;
DocumentIndex di = new DocumentIndex ( segmentPath , 100000 ) ;
DocumentIndex di = new DocumentIndex ( segmentPath , callback , 100000 ) ;
di . addConcurrent ( f ) ;
di . close ( ) ;
} else {
String query = "" ;
for ( int i = 2 ; i < args . length ; i + + ) query + = args [ i ] ;
query . trim ( ) ;
DocumentIndex di = new DocumentIndex ( segmentPath , 100000 ) ;
DocumentIndex di = new DocumentIndex ( segmentPath , callback , 100000 ) ;
ArrayList < File > results = di . find ( query ) ;
for ( File f : results ) {
if ( f ! = null ) System . out . println ( f . toString ( ) ) ;