~vcs-imports/lucene/trunk

« back to all changes in this revision

Viewing changes to solr/core/src/java/org/apache/solr/cloud/ZkController.java

  • Committer: markrmiller
  • Date: 2012-02-18 02:00:03 UTC
  • Revision ID: svn-v4:13f79535-47bb-0310-9956-ffa450edef68:lucene/dev/trunk:1245836
SOLR-3126: hardening around peer sync and tests

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
import java.io.File;
21
21
import java.io.IOException;
22
22
import java.net.InetAddress;
 
23
import java.net.MalformedURLException;
23
24
import java.util.Collections;
24
25
import java.util.HashMap;
25
26
import java.util.Iterator;
32
33
import java.util.regex.Matcher;
33
34
import java.util.regex.Pattern;
34
35
 
 
36
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
 
37
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
35
38
import org.apache.solr.common.SolrException;
36
39
import org.apache.solr.common.SolrException.ErrorCode;
37
40
import org.apache.solr.common.cloud.CloudState;
177
180
              //Overseer.createClientNodes(zkClient, getNodeName());
178
181
 
179
182
              ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
180
 
              overseerElector.joinElection(context);
 
183
              overseerElector.joinElection(context, null);
181
184
              zkStateReader.createClusterStateWatchersAndUpdate();
182
185
              
183
186
              List<CoreDescriptor> descriptors = registerOnReconnect
184
187
                  .getCurrentDescriptors();
185
188
              if (descriptors != null) {
186
189
                // before registering as live, make sure everyone is in a
187
 
                // recovery state
 
190
                // down state
188
191
                for (CoreDescriptor descriptor : descriptors) {
189
 
                  final String shardZkNodeName = getNodeName() + "_"
 
192
                  final String coreZkNodeName = getNodeName() + "_"
190
193
                      + descriptor.getName();
191
 
                  publishAsDown(getBaseUrl(), descriptor, shardZkNodeName,
 
194
                  publishAsDown(getBaseUrl(), descriptor, coreZkNodeName,
192
195
                      descriptor.getName());
 
196
                  waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
193
197
                }
194
198
              }
195
199
              
 
200
 
196
201
              // we have to register as live first to pick up docs in the buffer
197
202
              createEphemeralLiveNode();
198
203
              
218
223
            }
219
224
 
220
225
          }
 
226
 
 
227
 
221
228
        });
222
229
    cmdExecutor = new ZkCmdExecutor();
223
230
    leaderElector = new LeaderElector(zkClient);
337
344
      overseerElector = new LeaderElector(zkClient);
338
345
      ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
339
346
      overseerElector.setup(context);
340
 
      overseerElector.joinElection(context);
 
347
      overseerElector.joinElection(context, null);
341
348
      zkStateReader.createClusterStateWatchersAndUpdate();
342
349
      
343
350
    } catch (IOException e) {
478
485
    String shardId = cloudDesc.getShardId();
479
486
 
480
487
    Map<String,String> props = new HashMap<String,String>();
 
488
 // we only put a subset of props into the leader node
481
489
    props.put(ZkStateReader.BASE_URL_PROP, baseUrl);
482
490
    props.put(ZkStateReader.CORE_NAME_PROP, coreName);
483
491
    props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
484
 
    props.put(ZkStateReader.ROLES_PROP, cloudDesc.getRoles());
485
 
    props.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
 
492
 
486
493
 
487
494
    if (log.isInfoEnabled()) {
488
495
        log.info("Register shard - core:" + coreName + " address:"
489
496
            + baseUrl + " shardId:" + shardId);
490
497
    }
491
498
 
492
 
    // we only put a subset of props into the leader node
493
 
    ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
494
 
        props.get(ZkStateReader.BASE_URL_PROP), ZkStateReader.CORE_NAME_PROP,
495
 
        props.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.NODE_NAME_PROP,
496
 
        props.get(ZkStateReader.NODE_NAME_PROP));
497
 
    
498
 
 
499
 
    joinElection(collection, coreZkNodeName, shardId, leaderProps);
 
499
    ZkNodeProps leaderProps = new ZkNodeProps(props);
500
500
    
501
501
    // rather than look in the cluster state file, we go straight to the zknodes
502
502
    // here, because on cluster restart there could be stale leader info in the
503
503
    // cluster state node that won't be updated for a moment
504
 
    String leaderUrl = getLeaderUrl(collection, cloudDesc.getShardId());
 
504
    String leaderUrl = getLeaderProps(collection, cloudDesc.getShardId()).getCoreUrl();
505
505
    
506
506
    // now wait until our currently cloud state contains the latest leader
507
507
    String cloudStateLeader = zkStateReader.getLeaderUrl(collection, cloudDesc.getShardId(), 30000);
573
573
  }
574
574
  
575
575
  /**
576
 
   * Get leader URL directly from zk nodes.
 
576
   * Get leader props directly from zk nodes.
577
577
   * 
578
578
   * @param collection
579
579
   * @param slice
581
581
   * @throws KeeperException
582
582
   * @throws InterruptedException
583
583
   */
584
 
  private String getLeaderUrl(final String collection, final String slice)
 
584
  private ZkCoreNodeProps getLeaderProps(final String collection, final String slice)
585
585
      throws KeeperException, InterruptedException {
586
586
    int iterCount = 60;
587
587
    while (iterCount-- > 0)
591
591
            true);
592
592
        ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(
593
593
            ZkNodeProps.load(data));
594
 
        return leaderProps.getCoreUrl();
 
594
        return leaderProps;
595
595
      } catch (NoNodeException e) {
596
596
        Thread.sleep(500);
597
597
      }
600
600
 
601
601
 
602
602
  private void joinElection(final String collection,
603
 
      final String shardZkNodeName, String shardId, ZkNodeProps leaderProps) throws InterruptedException, KeeperException, IOException {
 
603
      final String shardZkNodeName, String shardId, ZkNodeProps leaderProps, SolrCore core) throws InterruptedException, KeeperException, IOException {
604
604
    ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
605
605
        collection, shardZkNodeName, leaderProps, this, cc);
606
606
    
607
607
    leaderElector.setup(context);
608
 
    leaderElector.joinElection(context);
 
608
    leaderElector.joinElection(context, core);
609
609
  }
610
610
 
611
611
 
671
671
    publishState(cd, shardZkNodeName, coreName, finalProps);
672
672
  }
673
673
 
674
 
  public void publish(SolrCore core, String state) {
675
 
    CoreDescriptor cd = core.getCoreDescriptor();
 
674
  public void publish(CoreDescriptor cd, String state) {
676
675
    Map<String,String> finalProps = new HashMap<String,String>();
677
676
    finalProps.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
678
 
    finalProps.put(ZkStateReader.CORE_NAME_PROP, core.getName());
 
677
    finalProps.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
679
678
    finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
680
679
    finalProps.put(ZkStateReader.STATE_PROP, state);
681
 
    publishState(cd, getNodeName() + "_" + core.getName(),
682
 
        core.getName(), finalProps);
 
680
    publishState(cd, getNodeName() + "_" + cd.getName(),
 
681
        cd.getName(), finalProps);
683
682
  }
684
683
  
685
684
  void publishAsDown(String baseUrl,
959
958
    uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
960
959
  }
961
960
 
 
961
  public void preRegisterSetup(SolrCore core, CoreDescriptor cd) {
 
962
    // before becoming available, make sure we are not live and active
 
963
    // this also gets us our assigned shard id if it was not specified
 
964
    publish(cd, ZkStateReader.DOWN);
 
965
    
 
966
    String shardId = cd.getCloudDescriptor().getShardId();
 
967
    
 
968
    Map<String,String> props = new HashMap<String,String>();
 
969
    // we only put a subset of props into the leader node
 
970
    props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
 
971
    props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
 
972
    props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
 
973
    
 
974
    final String coreZkNodeName = getNodeName() + "_" + cd.getName();
 
975
    ZkNodeProps ourProps = new ZkNodeProps(props);
 
976
    String collection = cd.getCloudDescriptor()
 
977
        .getCollectionName();
 
978
    
 
979
    try {
 
980
      joinElection(collection, coreZkNodeName, shardId, ourProps, core);
 
981
    } catch (InterruptedException e) {
 
982
      // Restore the interrupted status
 
983
      Thread.currentThread().interrupt();
 
984
      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
 
985
    } catch (KeeperException e) {
 
986
      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
 
987
    } catch (IOException e) {
 
988
      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
 
989
    }
 
990
 
 
991
      
 
992
      waitForLeaderToSeeDownState(cd, coreZkNodeName);
 
993
    
 
994
  }
 
995
 
 
996
  private ZkCoreNodeProps waitForLeaderToSeeDownState(
 
997
      CoreDescriptor descriptor, final String shardZkNodeName) {
 
998
    CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
 
999
    String collection = cloudDesc.getCollectionName();
 
1000
    String shard = cloudDesc.getShardId();
 
1001
    ZkCoreNodeProps leaderProps;
 
1002
    try {
 
1003
      // go straight to zk, not the cloud state - we must have current info
 
1004
      leaderProps = getLeaderProps(collection, shard);
 
1005
    } catch (InterruptedException e) {
 
1006
      // Restore the interrupted status
 
1007
      Thread.currentThread().interrupt();
 
1008
      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
 
1009
    } catch (KeeperException e) {
 
1010
      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
 
1011
    }
 
1012
    
 
1013
    String leaderBaseUrl = leaderProps.getBaseUrl();
 
1014
    String leaderCoreName = leaderProps.getCoreName();
 
1015
    
 
1016
    String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(),
 
1017
        descriptor.getName());
 
1018
    
 
1019
    boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
 
1020
    if (!isLeader && !SKIP_AUTO_RECOVERY) {
 
1021
      // wait until the leader sees us as down before we are willing to accept
 
1022
      // updates.
 
1023
      CommonsHttpSolrServer server = null;
 
1024
      try {
 
1025
        server = new CommonsHttpSolrServer(leaderBaseUrl);
 
1026
      } catch (MalformedURLException e) {
 
1027
        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
 
1028
            e);
 
1029
      }
 
1030
      server.setConnectionTimeout(45000);
 
1031
      server.setSoTimeout(45000);
 
1032
      WaitForState prepCmd = new WaitForState();
 
1033
      prepCmd.setCoreName(leaderCoreName);
 
1034
      prepCmd.setNodeName(getNodeName());
 
1035
      prepCmd.setCoreNodeName(shardZkNodeName);
 
1036
      prepCmd.setState(ZkStateReader.DOWN);
 
1037
      prepCmd.setCheckLive(false);
 
1038
      
 
1039
      try {
 
1040
        server.request(prepCmd);
 
1041
      } catch (Exception e) {
 
1042
        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
 
1043
            "Could not talk to the leader", e);
 
1044
      }
 
1045
      server.shutdown();
 
1046
    }
 
1047
    return leaderProps;
 
1048
  }
 
1049
 
962
1050
}