177
180
//Overseer.createClientNodes(zkClient, getNodeName());
179
182
ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
180
overseerElector.joinElection(context);
183
overseerElector.joinElection(context, null);
181
184
zkStateReader.createClusterStateWatchersAndUpdate();
183
186
List<CoreDescriptor> descriptors = registerOnReconnect
184
187
.getCurrentDescriptors();
185
188
if (descriptors != null) {
186
189
// before registering as live, make sure everyone is in a
188
191
for (CoreDescriptor descriptor : descriptors) {
189
final String shardZkNodeName = getNodeName() + "_"
192
final String coreZkNodeName = getNodeName() + "_"
190
193
+ descriptor.getName();
191
publishAsDown(getBaseUrl(), descriptor, shardZkNodeName,
194
publishAsDown(getBaseUrl(), descriptor, coreZkNodeName,
192
195
descriptor.getName());
196
waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
196
201
// we have to register as live first to pick up docs in the buffer
197
202
createEphemeralLiveNode();
478
485
String shardId = cloudDesc.getShardId();
480
487
Map<String,String> props = new HashMap<String,String>();
488
// we only put a subset of props into the leader node
481
489
props.put(ZkStateReader.BASE_URL_PROP, baseUrl);
482
490
props.put(ZkStateReader.CORE_NAME_PROP, coreName);
483
491
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
484
props.put(ZkStateReader.ROLES_PROP, cloudDesc.getRoles());
485
props.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
487
494
if (log.isInfoEnabled()) {
488
495
log.info("Register shard - core:" + coreName + " address:"
489
496
+ baseUrl + " shardId:" + shardId);
492
// we only put a subset of props into the leader node
493
ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
494
props.get(ZkStateReader.BASE_URL_PROP), ZkStateReader.CORE_NAME_PROP,
495
props.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.NODE_NAME_PROP,
496
props.get(ZkStateReader.NODE_NAME_PROP));
499
joinElection(collection, coreZkNodeName, shardId, leaderProps);
499
ZkNodeProps leaderProps = new ZkNodeProps(props);
501
501
// rather than look in the cluster state file, we go straight to the zknodes
502
502
// here, because on cluster restart there could be stale leader info in the
503
503
// cluster state node that won't be updated for a moment
504
String leaderUrl = getLeaderUrl(collection, cloudDesc.getShardId());
504
String leaderUrl = getLeaderProps(collection, cloudDesc.getShardId()).getCoreUrl();
506
506
// now wait until our currently cloud state contains the latest leader
507
507
String cloudStateLeader = zkStateReader.getLeaderUrl(collection, cloudDesc.getShardId(), 30000);
602
602
private void joinElection(final String collection,
603
final String shardZkNodeName, String shardId, ZkNodeProps leaderProps) throws InterruptedException, KeeperException, IOException {
603
final String shardZkNodeName, String shardId, ZkNodeProps leaderProps, SolrCore core) throws InterruptedException, KeeperException, IOException {
604
604
ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
605
605
collection, shardZkNodeName, leaderProps, this, cc);
607
607
leaderElector.setup(context);
608
leaderElector.joinElection(context);
608
leaderElector.joinElection(context, core);
671
671
publishState(cd, shardZkNodeName, coreName, finalProps);
674
public void publish(SolrCore core, String state) {
675
CoreDescriptor cd = core.getCoreDescriptor();
674
public void publish(CoreDescriptor cd, String state) {
676
675
Map<String,String> finalProps = new HashMap<String,String>();
677
676
finalProps.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
678
finalProps.put(ZkStateReader.CORE_NAME_PROP, core.getName());
677
finalProps.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
679
678
finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
680
679
finalProps.put(ZkStateReader.STATE_PROP, state);
681
publishState(cd, getNodeName() + "_" + core.getName(),
682
core.getName(), finalProps);
680
publishState(cd, getNodeName() + "_" + cd.getName(),
681
cd.getName(), finalProps);
685
684
void publishAsDown(String baseUrl,
959
958
uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
961
public void preRegisterSetup(SolrCore core, CoreDescriptor cd) {
962
// before becoming available, make sure we are not live and active
963
// this also gets us our assigned shard id if it was not specified
964
publish(cd, ZkStateReader.DOWN);
966
String shardId = cd.getCloudDescriptor().getShardId();
968
Map<String,String> props = new HashMap<String,String>();
969
// we only put a subset of props into the leader node
970
props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
971
props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
972
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
974
final String coreZkNodeName = getNodeName() + "_" + cd.getName();
975
ZkNodeProps ourProps = new ZkNodeProps(props);
976
String collection = cd.getCloudDescriptor()
977
.getCollectionName();
980
joinElection(collection, coreZkNodeName, shardId, ourProps, core);
981
} catch (InterruptedException e) {
982
// Restore the interrupted status
983
Thread.currentThread().interrupt();
984
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
985
} catch (KeeperException e) {
986
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
987
} catch (IOException e) {
988
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
992
waitForLeaderToSeeDownState(cd, coreZkNodeName);
996
private ZkCoreNodeProps waitForLeaderToSeeDownState(
997
CoreDescriptor descriptor, final String shardZkNodeName) {
998
CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
999
String collection = cloudDesc.getCollectionName();
1000
String shard = cloudDesc.getShardId();
1001
ZkCoreNodeProps leaderProps;
1003
// go straight to zk, not the cloud state - we must have current info
1004
leaderProps = getLeaderProps(collection, shard);
1005
} catch (InterruptedException e) {
1006
// Restore the interrupted status
1007
Thread.currentThread().interrupt();
1008
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
1009
} catch (KeeperException e) {
1010
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
1013
String leaderBaseUrl = leaderProps.getBaseUrl();
1014
String leaderCoreName = leaderProps.getCoreName();
1016
String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(),
1017
descriptor.getName());
1019
boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
1020
if (!isLeader && !SKIP_AUTO_RECOVERY) {
1021
// wait until the leader sees us as down before we are willing to accept
1023
CommonsHttpSolrServer server = null;
1025
server = new CommonsHttpSolrServer(leaderBaseUrl);
1026
} catch (MalformedURLException e) {
1027
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
1030
server.setConnectionTimeout(45000);
1031
server.setSoTimeout(45000);
1032
WaitForState prepCmd = new WaitForState();
1033
prepCmd.setCoreName(leaderCoreName);
1034
prepCmd.setNodeName(getNodeName());
1035
prepCmd.setCoreNodeName(shardZkNodeName);
1036
prepCmd.setState(ZkStateReader.DOWN);
1037
prepCmd.setCheckLive(false);
1040
server.request(prepCmd);
1041
} catch (Exception e) {
1042
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
1043
"Could not talk to the leader", e);