~juju-qa/ubuntu/xenial/juju/xenial-2.0-beta3

« back to all changes in this revision

Viewing changes to src/github.com/juju/utils/multireader.go

  • Committer: Martin Packman
  • Date: 2016-03-30 19:31:08 UTC
  • mfrom: (1.1.41)
  • Revision ID: martin.packman@canonical.com-20160330193108-h9iz3ak334uk0z5r
Merge new upstream source 2.0~beta3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
package utils
 
2
 
 
3
import (
 
4
        "io"
 
5
        "sort"
 
6
 
 
7
        "github.com/juju/errors"
 
8
)
 
9
 
 
10
// SizeReaderAt combines io.ReaderAt with a Size method.
 
11
type SizeReaderAt interface {
 
12
        // Size returns the size of the data readable
 
13
        // from the reader.
 
14
        Size() int64
 
15
        io.ReaderAt
 
16
}
 
17
 
 
18
// NewMultiReaderAt is like io.MultiReader but produces a ReaderAt
 
19
// (and Size), instead of just a reader.
 
20
//
 
21
// Note: this implementation was taken from a talk given
 
22
// by Brad Fitzpatrick as OSCON 2013.
 
23
//
 
24
// http://talks.golang.org/2013/oscon-dl.slide#49
 
25
// https://github.com/golang/talks/blob/master/2013/oscon-dl/server-compose.go
 
26
func NewMultiReaderAt(parts ...SizeReaderAt) SizeReaderAt {
 
27
        m := &multiReaderAt{
 
28
                parts: make([]offsetAndSource, 0, len(parts)),
 
29
        }
 
30
        var off int64
 
31
        for _, p := range parts {
 
32
                m.parts = append(m.parts, offsetAndSource{off, p})
 
33
                off += p.Size()
 
34
        }
 
35
        m.size = off
 
36
        return m
 
37
}
 
38
 
 
39
type offsetAndSource struct {
 
40
        off int64
 
41
        SizeReaderAt
 
42
}
 
43
 
 
44
type multiReaderAt struct {
 
45
        parts []offsetAndSource
 
46
        size  int64
 
47
}
 
48
 
 
49
func (m *multiReaderAt) Size() int64 {
 
50
        return m.size
 
51
}
 
52
 
 
53
func (m *multiReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
 
54
        wantN := len(p)
 
55
 
 
56
        // Skip past the requested offset.
 
57
        skipParts := sort.Search(len(m.parts), func(i int) bool {
 
58
                // This function returns whether parts[i] will
 
59
                // contribute any bytes to our output.
 
60
                part := m.parts[i]
 
61
                return part.off+part.Size() > off
 
62
        })
 
63
        parts := m.parts[skipParts:]
 
64
 
 
65
        // How far to skip in the first part.
 
66
        needSkip := off
 
67
        if len(parts) > 0 {
 
68
                needSkip -= parts[0].off
 
69
        }
 
70
 
 
71
        for len(parts) > 0 && len(p) > 0 {
 
72
                readP := p
 
73
                partSize := parts[0].Size()
 
74
                if int64(len(readP)) > partSize-needSkip {
 
75
                        readP = readP[:partSize-needSkip]
 
76
                }
 
77
                pn, err0 := parts[0].ReadAt(readP, needSkip)
 
78
                if err0 != nil {
 
79
                        return n, err0
 
80
                }
 
81
                n += pn
 
82
                p = p[pn:]
 
83
                if int64(pn)+needSkip == partSize {
 
84
                        parts = parts[1:]
 
85
                }
 
86
                needSkip = 0
 
87
        }
 
88
 
 
89
        if n != wantN {
 
90
                err = io.ErrUnexpectedEOF
 
91
        }
 
92
        return
 
93
}
 
94
 
 
95
// NewMultiReaderSeeker returns an io.ReadSeeker that combines
 
96
// all the given readers into a single one. It assumes that
 
97
// all the seekers are initially positioned at the start.
 
98
func NewMultiReaderSeeker(readers ...io.ReadSeeker) io.ReadSeeker {
 
99
        sreaders := make([]SizeReaderAt, len(readers))
 
100
        for i, r := range readers {
 
101
                r1, err := newSizeReaderAt(r)
 
102
                if err != nil {
 
103
                        panic(err)
 
104
                }
 
105
                sreaders[i] = r1
 
106
        }
 
107
        return &readSeeker{
 
108
                r: NewMultiReaderAt(sreaders...),
 
109
        }
 
110
}
 
111
 
 
112
// newSizeReaderAt adapts an io.ReadSeeker to a SizeReaderAt.
 
113
// Note that it doesn't strictly adhere to the ReaderAt
 
114
// contract because it's not safe to call ReadAt concurrently.
 
115
// This doesn't matter because io.ReadSeeker doesn't
 
116
// need to be thread-safe and this is only used in that
 
117
// context.
 
118
func newSizeReaderAt(r io.ReadSeeker) (SizeReaderAt, error) {
 
119
        size, err := r.Seek(0, 2)
 
120
        if err != nil {
 
121
                return nil, err
 
122
        }
 
123
        return &sizeReaderAt{
 
124
                r:    r,
 
125
                size: size,
 
126
                off:  size,
 
127
        }, nil
 
128
}
 
129
 
 
130
// sizeReaderAt adapts an io.ReadSeeker to a SizeReaderAt.
 
131
type sizeReaderAt struct {
 
132
        r    io.ReadSeeker
 
133
        size int64
 
134
        off  int64
 
135
}
 
136
 
 
137
// ReadAt implemnts SizeReaderAt.ReadAt.
 
138
func (r *sizeReaderAt) ReadAt(buf []byte, off int64) (n int, err error) {
 
139
        if off != r.off {
 
140
                _, err = r.r.Seek(off, 0)
 
141
                if err != nil {
 
142
                        return 0, err
 
143
                }
 
144
                r.off = off
 
145
        }
 
146
        n, err = io.ReadFull(r.r, buf)
 
147
        r.off += int64(n)
 
148
        return n, err
 
149
}
 
150
 
 
151
// Size implemnts SizeReaderAt.Size.
 
152
func (r *sizeReaderAt) Size() int64 {
 
153
        return r.size
 
154
}
 
155
 
 
156
// readSeeker adapts a SizeReaderAt to an io.ReadSeeker.
 
157
type readSeeker struct {
 
158
        r   SizeReaderAt
 
159
        off int64
 
160
}
 
161
 
 
162
// Seek implements io.Seeker.Seek.
 
163
func (r *readSeeker) Seek(off int64, whence int) (int64, error) {
 
164
        switch whence {
 
165
        case 0:
 
166
        case 1:
 
167
                off += r.off
 
168
        case 2:
 
169
                off = r.r.Size() + off
 
170
        }
 
171
        if off < 0 {
 
172
                return 0, errors.New("negative position")
 
173
        }
 
174
        r.off = off
 
175
        return off, nil
 
176
}
 
177
 
 
178
// Read implements io.Reader.Read.
 
179
func (r *readSeeker) Read(buf []byte) (int, error) {
 
180
        n, err := r.r.ReadAt(buf, r.off)
 
181
        r.off += int64(n)
 
182
        if err == io.ErrUnexpectedEOF {
 
183
                err = io.EOF
 
184
        }
 
185
        return n, err
 
186
}