2
memberlist is a library that manages cluster
3
membership and member failure detection using a gossip based protocol.
5
The use cases for such a library are far-reaching: all distributed systems
6
require membership, and memberlist is a re-usable solution to managing
7
cluster membership and node failure detection.
9
memberlist is eventually consistent but converges quickly on average.
10
The speed at which it converges can be heavily tuned via various knobs
11
on the protocol. Node failures are detected and network partitions are partially
12
tolerated by attempting to communicate to potentially dead nodes through
27
type Memberlist struct {
28
sequenceNum uint32 // Local sequence number
29
incarnation uint32 // Local incarnation number
30
numNodes uint32 // Number of known nodes (estimate)
34
shutdownCh chan struct{}
36
leaveBroadcast chan struct{}
38
udpListener *net.UDPConn
39
tcpListener *net.TCPListener
40
handoff chan msgHandoff
43
nodes []*nodeState // Known nodes
44
nodeMap map[string]*nodeState // Maps Addr.String() -> NodeState
47
tickers []*time.Ticker
48
stopTick chan struct{}
52
ackHandlers map[uint32]*ackHandler
54
broadcasts *TransmitLimitedQueue
59
// newMemberlist creates the network listeners.
60
// Does not schedule execution of background maintenence.
61
func newMemberlist(conf *Config) (*Memberlist, error) {
62
if conf.ProtocolVersion < ProtocolVersionMin {
63
return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
64
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
65
} else if conf.ProtocolVersion > ProtocolVersionMax {
66
return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]",
67
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
70
if len(conf.SecretKey) > 0 {
71
if conf.Keyring == nil {
72
keyring, err := NewKeyring(nil, conf.SecretKey)
76
conf.Keyring = keyring
78
if err := conf.Keyring.AddKey(conf.SecretKey); err != nil {
81
if err := conf.Keyring.UseKey(conf.SecretKey); err != nil {
87
tcpAddr := &net.TCPAddr{IP: net.ParseIP(conf.BindAddr), Port: conf.BindPort}
88
tcpLn, err := net.ListenTCP("tcp", tcpAddr)
90
return nil, fmt.Errorf("Failed to start TCP listener. Err: %s", err)
92
if conf.BindPort == 0 {
93
conf.BindPort = tcpLn.Addr().(*net.TCPAddr).Port
96
udpAddr := &net.UDPAddr{IP: net.ParseIP(conf.BindAddr), Port: conf.BindPort}
97
udpLn, err := net.ListenUDP("udp", udpAddr)
100
return nil, fmt.Errorf("Failed to start UDP listener. Err: %s", err)
103
// Set the UDP receive window size
106
if conf.LogOutput == nil {
107
conf.LogOutput = os.Stderr
109
logger := log.New(conf.LogOutput, "", log.LstdFlags)
113
shutdownCh: make(chan struct{}),
114
leaveBroadcast: make(chan struct{}, 1),
117
handoff: make(chan msgHandoff, 1024),
118
nodeMap: make(map[string]*nodeState),
119
ackHandlers: make(map[uint32]*ackHandler),
120
broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
123
m.broadcasts.NumNodes = func() int {
124
return m.estNumNodes()
132
// Create will create a new Memberlist using the given configuration.
133
// This will not connect to any other node (see Join) yet, but will start
134
// all the listeners to allow other nodes to join this memberlist.
135
// After creating a Memberlist, the configuration given should not be
136
// modified by the user anymore.
137
func Create(conf *Config) (*Memberlist, error) {
138
m, err := newMemberlist(conf)
142
if err := m.setAlive(); err != nil {
150
// Join is used to take an existing Memberlist and attempt to join a cluster
151
// by contacting all the given hosts and performing a state sync. Initially,
152
// the Memberlist only contains our own state, so doing this will cause
153
// remote nodes to become aware of the existence of this node, effectively
154
// joining the cluster.
156
// This returns the number of hosts successfully contacted and an error if
157
// none could be reached. If an error is returned, the node did not successfully
159
func (m *Memberlist) Join(existing []string) (int, error) {
160
// Attempt to join any of them
163
for _, exist := range existing {
164
addrs, port, err := m.resolveAddr(exist)
166
m.logger.Printf("[WARN] memberlist: Failed to resolve %s: %v", exist, err)
171
for _, addr := range addrs {
172
if err := m.pushPullNode(addr, port, true); err != nil {
185
return numSuccess, retErr
188
// resolveAddr is used to resolve the address into an address,
189
// port, and error. If no port is given, use the default
190
func (m *Memberlist) resolveAddr(hostStr string) ([][]byte, uint16, error) {
191
ips := make([][]byte, 0)
193
host, sport, err := net.SplitHostPort(hostStr)
194
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
195
// error, port missing - we can solve this
196
port = uint16(m.config.BindPort)
198
} else if err != nil {
199
// error, but not missing port
200
return ips, port, err
201
} else if lport, err := strconv.ParseUint(sport, 10, 16); err != nil {
202
// error, when parsing port
203
return ips, port, err
209
// Get the addresses that hostPort might resolve to
210
// ResolveTcpAddr requres ipv6 brackets to separate
211
// port numbers whereas ParseIP doesn't, but luckily
212
// SplitHostPort takes care of the brackets
213
if ip := net.ParseIP(host); ip == nil {
214
if pre, err := net.LookupIP(host); err == nil {
215
for _, ip := range pre {
216
ips = append(ips, ip)
219
return ips, port, err
222
ips = append(ips, ip)
225
return ips, port, nil
228
// setAlive is used to mark this node as being alive. This is the same
229
// as if we received an alive notification our own network channel for
231
func (m *Memberlist) setAlive() error {
232
var advertiseAddr []byte
233
var advertisePort int
234
if m.config.AdvertiseAddr != "" {
235
// If AdvertiseAddr is not empty, then advertise
236
// the given address and port.
237
ip := net.ParseIP(m.config.AdvertiseAddr)
239
return fmt.Errorf("Failed to parse advertise address!")
242
// Ensure IPv4 conversion if necessary
243
if ip4 := ip.To4(); ip4 != nil {
248
advertisePort = m.config.AdvertisePort
250
if m.config.BindAddr == "0.0.0.0" {
251
// Otherwise, if we're not bound to a specific IP,
252
//let's list the interfaces on this machine and use
253
// the first private IP we find.
254
addresses, err := net.InterfaceAddrs()
256
return fmt.Errorf("Failed to get interface addresses! Err: %v", err)
259
// Find private IPv4 address
260
for _, rawAddr := range addresses {
262
switch addr := rawAddr.(type) {
274
if !isPrivateIP(ip.String()) {
282
// Failed to find private IP, error
283
if advertiseAddr == nil {
284
return fmt.Errorf("No private IP address found, and explicit IP not provided")
288
// Use the IP that we're bound to.
289
addr := m.tcpListener.Addr().(*net.TCPAddr)
290
advertiseAddr = addr.IP
293
// Use the port we are bound to.
294
advertisePort = m.tcpListener.Addr().(*net.TCPAddr).Port
297
// Check if this is a public address without encryption
298
addrStr := net.IP(advertiseAddr).String()
299
if !isPrivateIP(addrStr) && !isLoopbackIP(addrStr) && !m.config.EncryptionEnabled() {
300
m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!")
303
// Get the node meta data
305
if m.config.Delegate != nil {
306
meta = m.config.Delegate.NodeMeta(MetaMaxSize)
307
if len(meta) > MetaMaxSize {
308
panic("Node meta data provided is longer than the limit")
313
Incarnation: m.nextIncarnation(),
316
Port: uint16(advertisePort),
319
ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion,
320
m.config.DelegateProtocolMin, m.config.DelegateProtocolMax,
321
m.config.DelegateProtocolVersion,
324
m.aliveNode(&a, nil, true)
329
// LocalNode is used to return the local Node
330
func (m *Memberlist) LocalNode() *Node {
332
defer m.nodeLock.RUnlock()
333
state := m.nodeMap[m.config.Name]
337
// UpdateNode is used to trigger re-advertising the local node. This is
338
// primarily used with a Delegate to support dynamic updates to the local
339
// meta data. This will block until the update message is successfully
340
// broadcasted to a member of the cluster, if any exist or until a specified
341
// timeout is reached.
342
func (m *Memberlist) UpdateNode(timeout time.Duration) error {
343
// Get the node meta data
345
if m.config.Delegate != nil {
346
meta = m.config.Delegate.NodeMeta(MetaMaxSize)
347
if len(meta) > MetaMaxSize {
348
panic("Node meta data provided is longer than the limit")
352
// Get the existing node
354
state := m.nodeMap[m.config.Name]
357
// Format a new alive message
359
Incarnation: m.nextIncarnation(),
365
ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion,
366
m.config.DelegateProtocolMin, m.config.DelegateProtocolMax,
367
m.config.DelegateProtocolVersion,
370
notifyCh := make(chan struct{})
371
m.aliveNode(&a, notifyCh, true)
373
// Wait for the broadcast or a timeout
375
var timeoutCh <-chan time.Time
377
timeoutCh = time.After(timeout)
382
return fmt.Errorf("timeout waiting for update broadcast")
388
// SendTo is used to directly send a message to another node, without
389
// the use of the gossip mechanism. This will encode the message as a
390
// user-data message, which a delegate will receive through NotifyMsg
391
// The actual data is transmitted over UDP, which means this is a
392
// best-effort transmission mechanism, and the maximum size of the
393
// message is the size of a single UDP datagram, after compression.
394
// This method is DEPRECATED in favor or SendToUDP
395
func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
396
// Encode as a user message
397
buf := make([]byte, 1, len(msg)+1)
398
buf[0] = byte(userMsg)
399
buf = append(buf, msg...)
402
return m.rawSendMsgUDP(to, buf)
405
// SendToUDP is used to directly send a message to another node, without
406
// the use of the gossip mechanism. This will encode the message as a
407
// user-data message, which a delegate will receive through NotifyMsg
408
// The actual data is transmitted over UDP, which means this is a
409
// best-effort transmission mechanism, and the maximum size of the
410
// message is the size of a single UDP datagram, after compression
411
func (m *Memberlist) SendToUDP(to *Node, msg []byte) error {
412
// Encode as a user message
413
buf := make([]byte, 1, len(msg)+1)
414
buf[0] = byte(userMsg)
415
buf = append(buf, msg...)
418
destAddr := &net.UDPAddr{IP: to.Addr, Port: int(to.Port)}
419
return m.rawSendMsgUDP(destAddr, buf)
422
// SendToTCP is used to directly send a message to another node, without
423
// the use of the gossip mechanism. This will encode the message as a
424
// user-data message, which a delegate will receive through NotifyMsg
425
// The actual data is transmitted over TCP, which means delivery
426
// is guaranteed if no error is returned. There is no limit
427
// to the size of the message
428
func (m *Memberlist) SendToTCP(to *Node, msg []byte) error {
430
destAddr := &net.TCPAddr{IP: to.Addr, Port: int(to.Port)}
431
return m.sendTCPUserMsg(destAddr, msg)
434
// Members returns a list of all known live nodes. The node structures
435
// returned must not be modified. If you wish to modify a Node, make a
437
func (m *Memberlist) Members() []*Node {
439
defer m.nodeLock.RUnlock()
441
nodes := make([]*Node, 0, len(m.nodes))
442
for _, n := range m.nodes {
443
if n.State != stateDead {
444
nodes = append(nodes, &n.Node)
451
// NumMembers returns the number of alive nodes currently known. Between
452
// the time of calling this and calling Members, the number of alive nodes
453
// may have changed, so this shouldn't be used to determine how many
454
// members will be returned by Members.
455
func (m *Memberlist) NumMembers() (alive int) {
457
defer m.nodeLock.RUnlock()
459
for _, n := range m.nodes {
460
if n.State != stateDead {
468
// Leave will broadcast a leave message but will not shutdown the background
469
// listeners, meaning the node will continue participating in gossip and state
472
// This will block until the leave message is successfully broadcasted to
473
// a member of the cluster, if any exist or until a specified timeout
476
// This method is safe to call multiple times, but must not be called
477
// after the cluster is already shut down.
478
func (m *Memberlist) Leave(timeout time.Duration) error {
480
// We can't defer m.nodeLock.Unlock() because m.deadNode will also try to
481
// acquire a lock so we need to Unlock before that.
485
panic("leave after shutdown")
491
state, ok := m.nodeMap[m.config.Name]
494
m.logger.Printf("[WARN] memberlist: Leave but we're not in the node map.")
499
Incarnation: state.Incarnation,
504
// Block until the broadcast goes out
506
var timeoutCh <-chan time.Time
508
timeoutCh = time.After(timeout)
511
case <-m.leaveBroadcast:
513
return fmt.Errorf("timeout waiting for leave broadcast")
523
// Check for any other alive node.
524
func (m *Memberlist) anyAlive() bool {
526
defer m.nodeLock.RUnlock()
527
for _, n := range m.nodes {
528
if n.State != stateDead && n.Name != m.config.Name {
535
// ProtocolVersion returns the protocol version currently in use by
537
func (m *Memberlist) ProtocolVersion() uint8 {
538
// NOTE: This method exists so that in the future we can control
539
// any locking if necessary, if we change the protocol version at
541
return m.config.ProtocolVersion
544
// Shutdown will stop any background maintanence of network activity
545
// for this memberlist, causing it to appear "dead". A leave message
546
// will not be broadcasted prior, so the cluster being left will have
547
// to detect this node's shutdown using probing. If you wish to more
548
// gracefully exit the cluster, call Leave prior to shutting down.
550
// This method is safe to call multiple times.
551
func (m *Memberlist) Shutdown() error {
553
defer m.nodeLock.Unlock()
562
m.udpListener.Close()
563
m.tcpListener.Close()