~ubuntu-branches/debian/sid/golang-github-hashicorp-memberlist/sid

« back to all changes in this revision

Viewing changes to memberlist.go

  • Committer: Package Import Robot
  • Author(s): Tianon Gravi
  • Date: 2015-10-21 15:39:43 UTC
  • Revision ID: package-import@ubuntu.com-20151021153943-leqgq5d0ss2e4alw
Tags: upstream-0.0~git20150921.0.28424fb
ImportĀ upstreamĀ versionĀ 0.0~git20150921.0.28424fb

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
memberlist is a library that manages cluster
 
3
membership and member failure detection using a gossip based protocol.
 
4
 
 
5
The use cases for such a library are far-reaching: all distributed systems
 
6
require membership, and memberlist is a re-usable solution to managing
 
7
cluster membership and node failure detection.
 
8
 
 
9
memberlist is eventually consistent but converges quickly on average.
 
10
The speed at which it converges can be heavily tuned via various knobs
 
11
on the protocol. Node failures are detected and network partitions are partially
 
12
tolerated by attempting to communicate to potentially dead nodes through
 
13
multiple routes.
 
14
*/
 
15
package memberlist
 
16
 
 
17
import (
 
18
        "fmt"
 
19
        "log"
 
20
        "net"
 
21
        "os"
 
22
        "strconv"
 
23
        "sync"
 
24
        "time"
 
25
)
 
26
 
 
27
type Memberlist struct {
 
28
        sequenceNum uint32 // Local sequence number
 
29
        incarnation uint32 // Local incarnation number
 
30
        numNodes    uint32 // Number of known nodes (estimate)
 
31
 
 
32
        config         *Config
 
33
        shutdown       bool
 
34
        shutdownCh     chan struct{}
 
35
        leave          bool
 
36
        leaveBroadcast chan struct{}
 
37
 
 
38
        udpListener *net.UDPConn
 
39
        tcpListener *net.TCPListener
 
40
        handoff     chan msgHandoff
 
41
 
 
42
        nodeLock sync.RWMutex
 
43
        nodes    []*nodeState          // Known nodes
 
44
        nodeMap  map[string]*nodeState // Maps Addr.String() -> NodeState
 
45
 
 
46
        tickerLock sync.Mutex
 
47
        tickers    []*time.Ticker
 
48
        stopTick   chan struct{}
 
49
        probeIndex int
 
50
 
 
51
        ackLock     sync.Mutex
 
52
        ackHandlers map[uint32]*ackHandler
 
53
 
 
54
        broadcasts *TransmitLimitedQueue
 
55
 
 
56
        logger *log.Logger
 
57
}
 
58
 
 
59
// newMemberlist creates the network listeners.
 
60
// Does not schedule execution of background maintenence.
 
61
func newMemberlist(conf *Config) (*Memberlist, error) {
 
62
        if conf.ProtocolVersion < ProtocolVersionMin {
 
63
                return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
 
64
                        conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
 
65
        } else if conf.ProtocolVersion > ProtocolVersionMax {
 
66
                return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]",
 
67
                        conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
 
68
        }
 
69
 
 
70
        if len(conf.SecretKey) > 0 {
 
71
                if conf.Keyring == nil {
 
72
                        keyring, err := NewKeyring(nil, conf.SecretKey)
 
73
                        if err != nil {
 
74
                                return nil, err
 
75
                        }
 
76
                        conf.Keyring = keyring
 
77
                } else {
 
78
                        if err := conf.Keyring.AddKey(conf.SecretKey); err != nil {
 
79
                                return nil, err
 
80
                        }
 
81
                        if err := conf.Keyring.UseKey(conf.SecretKey); err != nil {
 
82
                                return nil, err
 
83
                        }
 
84
                }
 
85
        }
 
86
 
 
87
        tcpAddr := &net.TCPAddr{IP: net.ParseIP(conf.BindAddr), Port: conf.BindPort}
 
88
        tcpLn, err := net.ListenTCP("tcp", tcpAddr)
 
89
        if err != nil {
 
90
                return nil, fmt.Errorf("Failed to start TCP listener. Err: %s", err)
 
91
        }
 
92
        if conf.BindPort == 0 {
 
93
                conf.BindPort = tcpLn.Addr().(*net.TCPAddr).Port
 
94
        }
 
95
 
 
96
        udpAddr := &net.UDPAddr{IP: net.ParseIP(conf.BindAddr), Port: conf.BindPort}
 
97
        udpLn, err := net.ListenUDP("udp", udpAddr)
 
98
        if err != nil {
 
99
                tcpLn.Close()
 
100
                return nil, fmt.Errorf("Failed to start UDP listener. Err: %s", err)
 
101
        }
 
102
 
 
103
        // Set the UDP receive window size
 
104
        setUDPRecvBuf(udpLn)
 
105
 
 
106
        if conf.LogOutput == nil {
 
107
                conf.LogOutput = os.Stderr
 
108
        }
 
109
        logger := log.New(conf.LogOutput, "", log.LstdFlags)
 
110
 
 
111
        m := &Memberlist{
 
112
                config:         conf,
 
113
                shutdownCh:     make(chan struct{}),
 
114
                leaveBroadcast: make(chan struct{}, 1),
 
115
                udpListener:    udpLn,
 
116
                tcpListener:    tcpLn,
 
117
                handoff:        make(chan msgHandoff, 1024),
 
118
                nodeMap:        make(map[string]*nodeState),
 
119
                ackHandlers:    make(map[uint32]*ackHandler),
 
120
                broadcasts:     &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
 
121
                logger:         logger,
 
122
        }
 
123
        m.broadcasts.NumNodes = func() int {
 
124
                return m.estNumNodes()
 
125
        }
 
126
        go m.tcpListen()
 
127
        go m.udpListen()
 
128
        go m.udpHandler()
 
129
        return m, nil
 
130
}
 
131
 
 
132
// Create will create a new Memberlist using the given configuration.
 
133
// This will not connect to any other node (see Join) yet, but will start
 
134
// all the listeners to allow other nodes to join this memberlist.
 
135
// After creating a Memberlist, the configuration given should not be
 
136
// modified by the user anymore.
 
137
func Create(conf *Config) (*Memberlist, error) {
 
138
        m, err := newMemberlist(conf)
 
139
        if err != nil {
 
140
                return nil, err
 
141
        }
 
142
        if err := m.setAlive(); err != nil {
 
143
                m.Shutdown()
 
144
                return nil, err
 
145
        }
 
146
        m.schedule()
 
147
        return m, nil
 
148
}
 
149
 
 
150
// Join is used to take an existing Memberlist and attempt to join a cluster
 
151
// by contacting all the given hosts and performing a state sync. Initially,
 
152
// the Memberlist only contains our own state, so doing this will cause
 
153
// remote nodes to become aware of the existence of this node, effectively
 
154
// joining the cluster.
 
155
//
 
156
// This returns the number of hosts successfully contacted and an error if
 
157
// none could be reached. If an error is returned, the node did not successfully
 
158
// join the cluster.
 
159
func (m *Memberlist) Join(existing []string) (int, error) {
 
160
        // Attempt to join any of them
 
161
        numSuccess := 0
 
162
        var retErr error
 
163
        for _, exist := range existing {
 
164
                addrs, port, err := m.resolveAddr(exist)
 
165
                if err != nil {
 
166
                        m.logger.Printf("[WARN] memberlist: Failed to resolve %s: %v", exist, err)
 
167
                        retErr = err
 
168
                        continue
 
169
                }
 
170
 
 
171
                for _, addr := range addrs {
 
172
                        if err := m.pushPullNode(addr, port, true); err != nil {
 
173
                                retErr = err
 
174
                                continue
 
175
                        }
 
176
                        numSuccess++
 
177
                }
 
178
 
 
179
        }
 
180
 
 
181
        if numSuccess > 0 {
 
182
                retErr = nil
 
183
        }
 
184
 
 
185
        return numSuccess, retErr
 
186
}
 
187
 
 
188
// resolveAddr is used to resolve the address into an address,
 
189
// port, and error. If no port is given, use the default
 
190
func (m *Memberlist) resolveAddr(hostStr string) ([][]byte, uint16, error) {
 
191
        ips := make([][]byte, 0)
 
192
        port := uint16(0)
 
193
        host, sport, err := net.SplitHostPort(hostStr)
 
194
        if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
 
195
                // error, port missing - we can solve this
 
196
                port = uint16(m.config.BindPort)
 
197
                host = hostStr
 
198
        } else if err != nil {
 
199
                // error, but not missing port
 
200
                return ips, port, err
 
201
        } else if lport, err := strconv.ParseUint(sport, 10, 16); err != nil {
 
202
                // error, when parsing port
 
203
                return ips, port, err
 
204
        } else {
 
205
                // no error
 
206
                port = uint16(lport)
 
207
        }
 
208
 
 
209
        // Get the addresses that hostPort might resolve to
 
210
        // ResolveTcpAddr requres ipv6 brackets to separate
 
211
        // port numbers whereas ParseIP doesn't, but luckily
 
212
        // SplitHostPort takes care of the brackets
 
213
        if ip := net.ParseIP(host); ip == nil {
 
214
                if pre, err := net.LookupIP(host); err == nil {
 
215
                        for _, ip := range pre {
 
216
                                ips = append(ips, ip)
 
217
                        }
 
218
                } else {
 
219
                        return ips, port, err
 
220
                }
 
221
        } else {
 
222
                ips = append(ips, ip)
 
223
        }
 
224
 
 
225
        return ips, port, nil
 
226
}
 
227
 
 
228
// setAlive is used to mark this node as being alive. This is the same
 
229
// as if we received an alive notification our own network channel for
 
230
// ourself.
 
231
func (m *Memberlist) setAlive() error {
 
232
        var advertiseAddr []byte
 
233
        var advertisePort int
 
234
        if m.config.AdvertiseAddr != "" {
 
235
                // If AdvertiseAddr is not empty, then advertise
 
236
                // the given address and port.
 
237
                ip := net.ParseIP(m.config.AdvertiseAddr)
 
238
                if ip == nil {
 
239
                        return fmt.Errorf("Failed to parse advertise address!")
 
240
                }
 
241
 
 
242
                // Ensure IPv4 conversion if necessary
 
243
                if ip4 := ip.To4(); ip4 != nil {
 
244
                        ip = ip4
 
245
                }
 
246
 
 
247
                advertiseAddr = ip
 
248
                advertisePort = m.config.AdvertisePort
 
249
        } else {
 
250
                if m.config.BindAddr == "0.0.0.0" {
 
251
                        // Otherwise, if we're not bound to a specific IP,
 
252
                        //let's list the interfaces on this machine and use
 
253
                        // the first private IP we find.
 
254
                        addresses, err := net.InterfaceAddrs()
 
255
                        if err != nil {
 
256
                                return fmt.Errorf("Failed to get interface addresses! Err: %v", err)
 
257
                        }
 
258
 
 
259
                        // Find private IPv4 address
 
260
                        for _, rawAddr := range addresses {
 
261
                                var ip net.IP
 
262
                                switch addr := rawAddr.(type) {
 
263
                                case *net.IPAddr:
 
264
                                        ip = addr.IP
 
265
                                case *net.IPNet:
 
266
                                        ip = addr.IP
 
267
                                default:
 
268
                                        continue
 
269
                                }
 
270
 
 
271
                                if ip.To4() == nil {
 
272
                                        continue
 
273
                                }
 
274
                                if !isPrivateIP(ip.String()) {
 
275
                                        continue
 
276
                                }
 
277
 
 
278
                                advertiseAddr = ip
 
279
                                break
 
280
                        }
 
281
 
 
282
                        // Failed to find private IP, error
 
283
                        if advertiseAddr == nil {
 
284
                                return fmt.Errorf("No private IP address found, and explicit IP not provided")
 
285
                        }
 
286
 
 
287
                } else {
 
288
                        // Use the IP that we're bound to.
 
289
                        addr := m.tcpListener.Addr().(*net.TCPAddr)
 
290
                        advertiseAddr = addr.IP
 
291
                }
 
292
 
 
293
                // Use the port we are bound to.
 
294
                advertisePort = m.tcpListener.Addr().(*net.TCPAddr).Port
 
295
        }
 
296
 
 
297
        // Check if this is a public address without encryption
 
298
        addrStr := net.IP(advertiseAddr).String()
 
299
        if !isPrivateIP(addrStr) && !isLoopbackIP(addrStr) && !m.config.EncryptionEnabled() {
 
300
                m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!")
 
301
        }
 
302
 
 
303
        // Get the node meta data
 
304
        var meta []byte
 
305
        if m.config.Delegate != nil {
 
306
                meta = m.config.Delegate.NodeMeta(MetaMaxSize)
 
307
                if len(meta) > MetaMaxSize {
 
308
                        panic("Node meta data provided is longer than the limit")
 
309
                }
 
310
        }
 
311
 
 
312
        a := alive{
 
313
                Incarnation: m.nextIncarnation(),
 
314
                Node:        m.config.Name,
 
315
                Addr:        advertiseAddr,
 
316
                Port:        uint16(advertisePort),
 
317
                Meta:        meta,
 
318
                Vsn: []uint8{
 
319
                        ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion,
 
320
                        m.config.DelegateProtocolMin, m.config.DelegateProtocolMax,
 
321
                        m.config.DelegateProtocolVersion,
 
322
                },
 
323
        }
 
324
        m.aliveNode(&a, nil, true)
 
325
 
 
326
        return nil
 
327
}
 
328
 
 
329
// LocalNode is used to return the local Node
 
330
func (m *Memberlist) LocalNode() *Node {
 
331
        m.nodeLock.RLock()
 
332
        defer m.nodeLock.RUnlock()
 
333
        state := m.nodeMap[m.config.Name]
 
334
        return &state.Node
 
335
}
 
336
 
 
337
// UpdateNode is used to trigger re-advertising the local node. This is
 
338
// primarily used with a Delegate to support dynamic updates to the local
 
339
// meta data.  This will block until the update message is successfully
 
340
// broadcasted to a member of the cluster, if any exist or until a specified
 
341
// timeout is reached.
 
342
func (m *Memberlist) UpdateNode(timeout time.Duration) error {
 
343
        // Get the node meta data
 
344
        var meta []byte
 
345
        if m.config.Delegate != nil {
 
346
                meta = m.config.Delegate.NodeMeta(MetaMaxSize)
 
347
                if len(meta) > MetaMaxSize {
 
348
                        panic("Node meta data provided is longer than the limit")
 
349
                }
 
350
        }
 
351
 
 
352
        // Get the existing node
 
353
        m.nodeLock.RLock()
 
354
        state := m.nodeMap[m.config.Name]
 
355
        m.nodeLock.RUnlock()
 
356
 
 
357
        // Format a new alive message
 
358
        a := alive{
 
359
                Incarnation: m.nextIncarnation(),
 
360
                Node:        m.config.Name,
 
361
                Addr:        state.Addr,
 
362
                Port:        state.Port,
 
363
                Meta:        meta,
 
364
                Vsn: []uint8{
 
365
                        ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion,
 
366
                        m.config.DelegateProtocolMin, m.config.DelegateProtocolMax,
 
367
                        m.config.DelegateProtocolVersion,
 
368
                },
 
369
        }
 
370
        notifyCh := make(chan struct{})
 
371
        m.aliveNode(&a, notifyCh, true)
 
372
 
 
373
        // Wait for the broadcast or a timeout
 
374
        if m.anyAlive() {
 
375
                var timeoutCh <-chan time.Time
 
376
                if timeout > 0 {
 
377
                        timeoutCh = time.After(timeout)
 
378
                }
 
379
                select {
 
380
                case <-notifyCh:
 
381
                case <-timeoutCh:
 
382
                        return fmt.Errorf("timeout waiting for update broadcast")
 
383
                }
 
384
        }
 
385
        return nil
 
386
}
 
387
 
 
388
// SendTo is used to directly send a message to another node, without
 
389
// the use of the gossip mechanism. This will encode the message as a
 
390
// user-data message, which a delegate will receive through NotifyMsg
 
391
// The actual data is transmitted over UDP, which means this is a
 
392
// best-effort transmission mechanism, and the maximum size of the
 
393
// message is the size of a single UDP datagram, after compression.
 
394
// This method is DEPRECATED in favor or SendToUDP
 
395
func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
 
396
        // Encode as a user message
 
397
        buf := make([]byte, 1, len(msg)+1)
 
398
        buf[0] = byte(userMsg)
 
399
        buf = append(buf, msg...)
 
400
 
 
401
        // Send the message
 
402
        return m.rawSendMsgUDP(to, buf)
 
403
}
 
404
 
 
405
// SendToUDP is used to directly send a message to another node, without
 
406
// the use of the gossip mechanism. This will encode the message as a
 
407
// user-data message, which a delegate will receive through NotifyMsg
 
408
// The actual data is transmitted over UDP, which means this is a
 
409
// best-effort transmission mechanism, and the maximum size of the
 
410
// message is the size of a single UDP datagram, after compression
 
411
func (m *Memberlist) SendToUDP(to *Node, msg []byte) error {
 
412
        // Encode as a user message
 
413
        buf := make([]byte, 1, len(msg)+1)
 
414
        buf[0] = byte(userMsg)
 
415
        buf = append(buf, msg...)
 
416
 
 
417
        // Send the message
 
418
        destAddr := &net.UDPAddr{IP: to.Addr, Port: int(to.Port)}
 
419
        return m.rawSendMsgUDP(destAddr, buf)
 
420
}
 
421
 
 
422
// SendToTCP is used to directly send a message to another node, without
 
423
// the use of the gossip mechanism. This will encode the message as a
 
424
// user-data message, which a delegate will receive through NotifyMsg
 
425
// The actual data is transmitted over TCP, which means delivery
 
426
// is guaranteed if no error is returned. There is no limit
 
427
// to the size of the message
 
428
func (m *Memberlist) SendToTCP(to *Node, msg []byte) error {
 
429
        // Send the message
 
430
        destAddr := &net.TCPAddr{IP: to.Addr, Port: int(to.Port)}
 
431
        return m.sendTCPUserMsg(destAddr, msg)
 
432
}
 
433
 
 
434
// Members returns a list of all known live nodes. The node structures
 
435
// returned must not be modified. If you wish to modify a Node, make a
 
436
// copy first.
 
437
func (m *Memberlist) Members() []*Node {
 
438
        m.nodeLock.RLock()
 
439
        defer m.nodeLock.RUnlock()
 
440
 
 
441
        nodes := make([]*Node, 0, len(m.nodes))
 
442
        for _, n := range m.nodes {
 
443
                if n.State != stateDead {
 
444
                        nodes = append(nodes, &n.Node)
 
445
                }
 
446
        }
 
447
 
 
448
        return nodes
 
449
}
 
450
 
 
451
// NumMembers returns the number of alive nodes currently known. Between
 
452
// the time of calling this and calling Members, the number of alive nodes
 
453
// may have changed, so this shouldn't be used to determine how many
 
454
// members will be returned by Members.
 
455
func (m *Memberlist) NumMembers() (alive int) {
 
456
        m.nodeLock.RLock()
 
457
        defer m.nodeLock.RUnlock()
 
458
 
 
459
        for _, n := range m.nodes {
 
460
                if n.State != stateDead {
 
461
                        alive++
 
462
                }
 
463
        }
 
464
 
 
465
        return
 
466
}
 
467
 
 
468
// Leave will broadcast a leave message but will not shutdown the background
 
469
// listeners, meaning the node will continue participating in gossip and state
 
470
// updates.
 
471
//
 
472
// This will block until the leave message is successfully broadcasted to
 
473
// a member of the cluster, if any exist or until a specified timeout
 
474
// is reached.
 
475
//
 
476
// This method is safe to call multiple times, but must not be called
 
477
// after the cluster is already shut down.
 
478
func (m *Memberlist) Leave(timeout time.Duration) error {
 
479
        m.nodeLock.Lock()
 
480
        // We can't defer m.nodeLock.Unlock() because m.deadNode will also try to
 
481
        // acquire a lock so we need to Unlock before that.
 
482
 
 
483
        if m.shutdown {
 
484
                m.nodeLock.Unlock()
 
485
                panic("leave after shutdown")
 
486
        }
 
487
 
 
488
        if !m.leave {
 
489
                m.leave = true
 
490
 
 
491
                state, ok := m.nodeMap[m.config.Name]
 
492
                m.nodeLock.Unlock()
 
493
                if !ok {
 
494
                        m.logger.Printf("[WARN] memberlist: Leave but we're not in the node map.")
 
495
                        return nil
 
496
                }
 
497
 
 
498
                d := dead{
 
499
                        Incarnation: state.Incarnation,
 
500
                        Node:        state.Name,
 
501
                }
 
502
                m.deadNode(&d)
 
503
 
 
504
                // Block until the broadcast goes out
 
505
                if m.anyAlive() {
 
506
                        var timeoutCh <-chan time.Time
 
507
                        if timeout > 0 {
 
508
                                timeoutCh = time.After(timeout)
 
509
                        }
 
510
                        select {
 
511
                        case <-m.leaveBroadcast:
 
512
                        case <-timeoutCh:
 
513
                                return fmt.Errorf("timeout waiting for leave broadcast")
 
514
                        }
 
515
                }
 
516
        } else {
 
517
                m.nodeLock.Unlock()
 
518
        }
 
519
 
 
520
        return nil
 
521
}
 
522
 
 
523
// Check for any other alive node.
 
524
func (m *Memberlist) anyAlive() bool {
 
525
        m.nodeLock.RLock()
 
526
        defer m.nodeLock.RUnlock()
 
527
        for _, n := range m.nodes {
 
528
                if n.State != stateDead && n.Name != m.config.Name {
 
529
                        return true
 
530
                }
 
531
        }
 
532
        return false
 
533
}
 
534
 
 
535
// ProtocolVersion returns the protocol version currently in use by
 
536
// this memberlist.
 
537
func (m *Memberlist) ProtocolVersion() uint8 {
 
538
        // NOTE: This method exists so that in the future we can control
 
539
        // any locking if necessary, if we change the protocol version at
 
540
        // runtime, etc.
 
541
        return m.config.ProtocolVersion
 
542
}
 
543
 
 
544
// Shutdown will stop any background maintanence of network activity
 
545
// for this memberlist, causing it to appear "dead". A leave message
 
546
// will not be broadcasted prior, so the cluster being left will have
 
547
// to detect this node's shutdown using probing. If you wish to more
 
548
// gracefully exit the cluster, call Leave prior to shutting down.
 
549
//
 
550
// This method is safe to call multiple times.
 
551
func (m *Memberlist) Shutdown() error {
 
552
        m.nodeLock.Lock()
 
553
        defer m.nodeLock.Unlock()
 
554
 
 
555
        if m.shutdown {
 
556
                return nil
 
557
        }
 
558
 
 
559
        m.shutdown = true
 
560
        close(m.shutdownCh)
 
561
        m.deschedule()
 
562
        m.udpListener.Close()
 
563
        m.tcpListener.Close()
 
564
        return nil
 
565
}