@ -51,13 +51,11 @@ public class ServerShard extends SolrServer {
_dummyOKResponse . setResponse ( new NamedList < Object > ( ) ) ;
_dummyOKResponse . setResponse ( new NamedList < Object > ( ) ) ;
}
}
private final ArrayList < SolrServer > server ;
private final ShardSelection shards ;
private final ShardSelection sharding ;
private final boolean writeEnabled ;
private final boolean writeEnabled ;
public ServerShard ( ArrayList < SolrServer > server , final ShardSelection . Method method , final boolean writeEnabled ) {
public ServerShard ( ArrayList < SolrServer > server , final ShardSelection . Method method , final boolean writeEnabled ) {
this . server = server ;
this . shards = new ShardSelection ( server , method ) ;
this . sharding = new ShardSelection ( method , this . server . size ( ) ) ;
this . writeEnabled = writeEnabled ;
this . writeEnabled = writeEnabled ;
}
}
@ -70,7 +68,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse add ( Collection < SolrInputDocument > docs ) throws SolrServerException , IOException {
public UpdateResponse add ( Collection < SolrInputDocument > docs ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrInputDocument doc : docs ) ur = server . get ( this . sharding . select ( doc ) ) . add ( doc ) ;
for ( SolrInputDocument doc : docs ) ur = this . shards . server4write ( doc ) . add ( doc ) ;
return ur ; // TODO: this accumlation of update responses is wrong, but sufficient (because we do not evaluate it)
return ur ; // TODO: this accumlation of update responses is wrong, but sufficient (because we do not evaluate it)
}
}
@ -85,7 +83,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse add ( Collection < SolrInputDocument > docs , int commitWithinMs ) throws SolrServerException , IOException {
public UpdateResponse add ( Collection < SolrInputDocument > docs , int commitWithinMs ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrInputDocument doc : docs ) ur = server . get ( this . sharding . select ( doc ) ) . add ( doc , commitWithinMs ) ;
for ( SolrInputDocument doc : docs ) ur = this . shards . server4write ( doc ) . add ( doc , commitWithinMs ) ;
return ur ;
return ur ;
}
}
@ -98,7 +96,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse addBeans ( Collection < ? > beans ) throws SolrServerException , IOException {
public UpdateResponse addBeans ( Collection < ? > beans ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . addBeans ( beans ) ;
for ( SolrServer s : this . shards ) ur = s . addBeans ( beans ) ;
return ur ;
return ur ;
}
}
@ -113,7 +111,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse addBeans ( Collection < ? > beans , int commitWithinMs ) throws SolrServerException , IOException {
public UpdateResponse addBeans ( Collection < ? > beans , int commitWithinMs ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . addBeans ( beans , commitWithinMs ) ;
for ( SolrServer s : this . shards ) ur = s . addBeans ( beans , commitWithinMs ) ;
return ur ;
return ur ;
}
}
@ -125,7 +123,7 @@ public class ServerShard extends SolrServer {
@Override
@Override
public UpdateResponse add ( SolrInputDocument doc ) throws SolrServerException , IOException {
public UpdateResponse add ( SolrInputDocument doc ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
return server . get ( this . sharding . select ( doc ) ) . add ( doc ) ;
return this . shards . server4write ( doc ) . add ( doc ) ;
}
}
/ * *
/ * *
@ -138,7 +136,7 @@ public class ServerShard extends SolrServer {
@Override
@Override
public UpdateResponse add ( SolrInputDocument doc , int commitWithinMs ) throws SolrServerException , IOException {
public UpdateResponse add ( SolrInputDocument doc , int commitWithinMs ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
return server . get ( this . sharding . select ( doc ) ) . add ( doc , commitWithinMs ) ;
return this . shards . server4write ( doc ) . add ( doc , commitWithinMs ) ;
}
}
/ * *
/ * *
@ -150,7 +148,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse addBean ( Object obj ) throws IOException , SolrServerException {
public UpdateResponse addBean ( Object obj ) throws IOException , SolrServerException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . addBean ( obj ) ;
for ( SolrServer s : this . shards ) ur = s . addBean ( obj ) ;
return ur ;
return ur ;
}
}
@ -165,7 +163,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse addBean ( Object obj , int commitWithinMs ) throws IOException , SolrServerException {
public UpdateResponse addBean ( Object obj , int commitWithinMs ) throws IOException , SolrServerException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . addBean ( obj , commitWithinMs ) ;
for ( SolrServer s : this . shards ) ur = s . addBean ( obj , commitWithinMs ) ;
return ur ;
return ur ;
}
}
@ -179,7 +177,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse commit ( ) throws SolrServerException , IOException {
public UpdateResponse commit ( ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . commit ( ) ;
for ( SolrServer s : this . shards ) ur = s . commit ( ) ;
return ur ;
return ur ;
}
}
@ -195,7 +193,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse optimize ( ) throws SolrServerException , IOException {
public UpdateResponse optimize ( ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . optimize ( ) ;
for ( SolrServer s : this . shards ) ur = s . optimize ( ) ;
return ur ;
return ur ;
}
}
@ -209,7 +207,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse commit ( boolean waitFlush , boolean waitSearcher ) throws SolrServerException , IOException {
public UpdateResponse commit ( boolean waitFlush , boolean waitSearcher ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . commit ( waitFlush , waitSearcher ) ;
for ( SolrServer s : this . shards ) ur = s . commit ( waitFlush , waitSearcher ) ;
return ur ;
return ur ;
}
}
@ -224,7 +222,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse commit ( boolean waitFlush , boolean waitSearcher , boolean softCommit ) throws SolrServerException , IOException {
public UpdateResponse commit ( boolean waitFlush , boolean waitSearcher , boolean softCommit ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . commit ( waitFlush , waitSearcher , softCommit ) ;
for ( SolrServer s : this . shards ) ur = s . commit ( waitFlush , waitSearcher , softCommit ) ;
return ur ;
return ur ;
}
}
@ -240,7 +238,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse optimize ( boolean waitFlush , boolean waitSearcher ) throws SolrServerException , IOException {
public UpdateResponse optimize ( boolean waitFlush , boolean waitSearcher ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . optimize ( waitFlush , waitSearcher ) ;
for ( SolrServer s : this . shards ) ur = s . optimize ( waitFlush , waitSearcher ) ;
return ur ;
return ur ;
}
}
@ -257,7 +255,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse optimize ( boolean waitFlush , boolean waitSearcher , int maxSegments ) throws SolrServerException , IOException {
public UpdateResponse optimize ( boolean waitFlush , boolean waitSearcher , int maxSegments ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . optimize ( waitFlush , waitSearcher , maxSegments ) ;
for ( SolrServer s : this . shards ) ur = s . optimize ( waitFlush , waitSearcher , maxSegments ) ;
return ur ;
return ur ;
}
}
@ -273,7 +271,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse rollback ( ) throws SolrServerException , IOException {
public UpdateResponse rollback ( ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . rollback ( ) ;
for ( SolrServer s : this . shards ) ur = s . rollback ( ) ;
return ur ;
return ur ;
}
}
@ -286,7 +284,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse deleteById ( String id ) throws SolrServerException , IOException {
public UpdateResponse deleteById ( String id ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . deleteById ( id ) ;
for ( SolrServer s : this . shards . server4read ( ) ) ur = s . deleteById ( id ) ;
return ur ;
return ur ;
}
}
@ -301,7 +299,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse deleteById ( String id , int commitWithinMs ) throws SolrServerException , IOException {
public UpdateResponse deleteById ( String id , int commitWithinMs ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . deleteById ( id , commitWithinMs ) ;
for ( SolrServer s : this . shards . server4read ( ) ) ur = s . deleteById ( id , commitWithinMs ) ;
return ur ;
return ur ;
}
}
@ -314,7 +312,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse deleteById ( List < String > ids ) throws SolrServerException , IOException {
public UpdateResponse deleteById ( List < String > ids ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . deleteById ( ids ) ;
for ( SolrServer s : this . shards . server4read ( ) ) ur = s . deleteById ( ids ) ;
return ur ;
return ur ;
}
}
@ -329,7 +327,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse deleteById ( List < String > ids , int commitWithinMs ) throws SolrServerException , IOException {
public UpdateResponse deleteById ( List < String > ids , int commitWithinMs ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . deleteById ( ids , commitWithinMs ) ;
for ( SolrServer s : this . shards . server4read ( ) ) ur = s . deleteById ( ids , commitWithinMs ) ;
return ur ;
return ur ;
}
}
@ -342,7 +340,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse deleteByQuery ( String query ) throws SolrServerException , IOException {
public UpdateResponse deleteByQuery ( String query ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . deleteByQuery ( query ) ;
for ( SolrServer s : this . shards . server4read ( ) ) ur = s . deleteByQuery ( query ) ;
return ur ;
return ur ;
}
}
@ -357,7 +355,7 @@ public class ServerShard extends SolrServer {
public UpdateResponse deleteByQuery ( String query , int commitWithinMs ) throws SolrServerException , IOException {
public UpdateResponse deleteByQuery ( String query , int commitWithinMs ) throws SolrServerException , IOException {
if ( ! this . writeEnabled ) return _dummyOKResponse ;
if ( ! this . writeEnabled ) return _dummyOKResponse ;
UpdateResponse ur = null ;
UpdateResponse ur = null ;
for ( SolrServer s : server ) ur = s . deleteByQuery ( query , commitWithinMs ) ;
for ( SolrServer s : this . shards . server4read ( ) ) ur = s . deleteByQuery ( query , commitWithinMs ) ;
return ur ;
return ur ;
}
}
@ -367,7 +365,7 @@ public class ServerShard extends SolrServer {
* /
* /
@Override
@Override
public SolrPingResponse ping ( ) throws SolrServerException , IOException {
public SolrPingResponse ping ( ) throws SolrServerException , IOException {
for ( SolrServer s : server ) {
for ( SolrServer s : this . shards ) {
SolrPingResponse spr = s . ping ( ) ;
SolrPingResponse spr = s . ping ( ) ;
if ( spr ! = null ) return spr ;
if ( spr ! = null ) return spr ;
}
}
@ -380,11 +378,15 @@ public class ServerShard extends SolrServer {
* /
* /
@Override
@Override
public QueryResponse query ( final SolrParams params ) throws SolrServerException {
public QueryResponse query ( final SolrParams params ) throws SolrServerException {
List < SolrServer > qs = this . shards . server4read ( ) ;
if ( qs . size ( ) = = 1 ) {
return qs . get ( 0 ) . query ( params ) ;
}
final Collection < QueryResponse > qrl = new ConcurrentLinkedQueue < QueryResponse > ( ) ;
// concurrently call all shards
// concurrently call all shards
final Collection < QueryResponse > qrl = new ConcurrentLinkedQueue < QueryResponse > ( ) ;
List < Thread > t = new ArrayList < Thread > ( ) ;
List < Thread > t = new ArrayList < Thread > ( ) ;
for ( final SolrServer s : server ) {
for ( final SolrServer s : q s) {
Thread t0 = new Thread ( ) {
Thread t0 = new Thread ( ) {
@Override
@Override
public void run ( ) {
public void run ( ) {
@ -414,11 +416,15 @@ public class ServerShard extends SolrServer {
* /
* /
@Override
@Override
public QueryResponse query ( final SolrParams params , final METHOD method ) throws SolrServerException {
public QueryResponse query ( final SolrParams params , final METHOD method ) throws SolrServerException {
List < SolrServer > qs = this . shards . server4read ( ) ;
if ( qs . size ( ) = = 1 ) {
return qs . get ( 0 ) . query ( params , method ) ;
}
final Collection < QueryResponse > qrl = new ConcurrentLinkedQueue < QueryResponse > ( ) ;
final Collection < QueryResponse > qrl = new ConcurrentLinkedQueue < QueryResponse > ( ) ;
// concurrently call all shards
// concurrently call all shards
List < Thread > t = new ArrayList < Thread > ( ) ;
List < Thread > t = new ArrayList < Thread > ( ) ;
for ( final SolrServer s : server ) {
for ( final SolrServer s : q s) {
Thread t0 = new Thread ( ) {
Thread t0 = new Thread ( ) {
@Override
@Override
public void run ( ) {
public void run ( ) {
@ -464,14 +470,14 @@ public class ServerShard extends SolrServer {
@Override
@Override
public NamedList < Object > request ( final SolrRequest request ) throws SolrServerException , IOException {
public NamedList < Object > request ( final SolrRequest request ) throws SolrServerException , IOException {
ResponseAccumulator acc = new ResponseAccumulator ( ) ;
ResponseAccumulator acc = new ResponseAccumulator ( ) ;
for ( SolrServer s : server ) acc . addResponse ( s . request ( request ) ) ;
for ( SolrServer s : this . shards . server4read ( ) ) acc . addResponse ( s . request ( request ) ) ;
return acc . getAccumulatedResponse ( ) ;
return acc . getAccumulatedResponse ( ) ;
}
}
@Override
@Override
public DocumentObjectBinder getBinder ( ) {
public DocumentObjectBinder getBinder ( ) {
DocumentObjectBinder db ;
DocumentObjectBinder db ;
for ( SolrServer s : server ) {
for ( SolrServer s : this . shards ) {
db = s . getBinder ( ) ;
db = s . getBinder ( ) ;
if ( db ! = null ) return db ;
if ( db ! = null ) return db ;
}
}
@ -480,7 +486,7 @@ public class ServerShard extends SolrServer {
@Override
@Override
public void shutdown ( ) {
public void shutdown ( ) {
for ( SolrServer s : server ) {
for ( SolrServer s : this . shards ) {
s . shutdown ( ) ;
s . shutdown ( ) ;
}
}
}
}