46
48
ioError = "io-error"
47
49
invalidRequest = "invalid-request"
48
unknownChannel = "unknown channel"
50
unknownChannel = "unknown-channel"
49
51
unavailable = "unavailable"
50
52
internalError = "internal"
97
99
"Missing data field",
101
ErrInvalidExpiration = &APIError{
102
http.StatusBadRequest,
104
"Invalid expiration date",
106
ErrPastExpiration = &APIError{
107
http.StatusBadRequest,
109
"Past expiration date",
99
111
ErrUnknownChannel = &APIError{
100
112
http.StatusBadRequest,
121
ErrStoreUnavailable = &APIError{
122
http.StatusServiceUnavailable,
124
"Message store unavailable",
109
126
ErrCouldNotStoreNotification = &APIError{
110
127
http.StatusServiceUnavailable,
122
139
// Broadcast request JSON object.
123
140
type Broadcast struct {
124
Channel string `json:"channel"`
125
ExpireAfter uint8 `json:"expire_after"`
126
Data json.RawMessage `json:"data"`
141
Channel string `json:"channel"`
142
ExpireOn string `json:"expire_on"`
143
Data json.RawMessage `json:"data"`
129
func respondError(writer http.ResponseWriter, apiErr *APIError) {
146
// RespondError writes back a JSON error response for a APIError.
147
func RespondError(writer http.ResponseWriter, apiErr *APIError) {
130
148
wireError, err := json.Marshal(apiErr)
132
150
panic(fmt.Errorf("couldn't marshal our own errors: %v", err))
136
154
writer.Write(wireError)
139
func checkContentLength(request *http.Request) *APIError {
157
func checkContentLength(request *http.Request, maxBodySize int64) *APIError {
140
158
if request.ContentLength == -1 {
141
159
return ErrNoContentLengthProvided
143
161
if request.ContentLength == 0 {
144
162
return ErrRequestBodyEmpty
146
if request.ContentLength > MaxRequestBodyBytes {
164
if request.ContentLength > maxBodySize {
147
165
return ErrRequestBodyTooLarge
152
func checkRequestAsPost(request *http.Request) *APIError {
153
if err := checkContentLength(request); err != nil {
170
func checkRequestAsPost(request *http.Request, maxBodySize int64) *APIError {
171
if err := checkContentLength(request, maxBodySize); err != nil {
156
174
if request.Header.Get("Content-Type") != JSONMediaType {
165
func readBody(request *http.Request) ([]byte, *APIError) {
166
if err := checkRequestAsPost(request); err != nil {
183
// ReadBody checks that a POST request is well-formed and reads its body.
184
func ReadBody(request *http.Request, maxBodySize int64) ([]byte, *APIError) {
185
if err := checkRequestAsPost(request, maxBodySize); err != nil {
180
func checkBroadcast(bcast *Broadcast) *APIError {
199
var zeroTime = time.Time{}
201
func checkBroadcast(bcast *Broadcast) (time.Time, *APIError) {
181
202
if len(bcast.Data) == 0 {
182
return ErrMissingData
187
// state holds the interfaces to delegate to serving requests
189
store store.PendingStore
190
broker broker.BrokerSending
194
type BroadcastHandler state
196
func (h *BroadcastHandler) doBroadcast(bcast *Broadcast) *APIError {
197
apiErr := checkBroadcast(bcast)
203
return zeroTime, ErrMissingData
205
expire, err := time.Parse(time.RFC3339, bcast.ExpireOn)
207
return zeroTime, ErrInvalidExpiration
209
if expire.Before(time.Now()) {
210
return zeroTime, ErrPastExpiration
215
type StoreForRequest func(w http.ResponseWriter, request *http.Request) (store.PendingStore, error)
217
// context holds the interfaces to delegate to serving requests
218
type context struct {
219
storeForRequest StoreForRequest
220
broker broker.BrokerSending
224
func (ctx *context) getStore(w http.ResponseWriter, request *http.Request) (store.PendingStore, *APIError) {
225
sto, err := ctx.storeForRequest(w, request)
227
apiErr, ok := err.(*APIError)
231
ctx.logger.Errorf("failed to get store: %v", err)
232
return nil, ErrUnknown
237
type BroadcastHandler struct {
241
func (h *BroadcastHandler) doBroadcast(sto store.PendingStore, bcast *Broadcast) *APIError {
242
expire, apiErr := checkBroadcast(bcast)
198
243
if apiErr != nil {
201
chanId, err := h.store.GetInternalChannelId(bcast.Channel)
246
chanId, err := sto.GetInternalChannelId(bcast.Channel)
204
249
case store.ErrUnknownChannel:
207
252
return ErrUnknown
210
// xxx ignoring expiration for now
211
err = h.store.AppendToChannel(chanId, bcast.Data)
255
err = sto.AppendToChannel(chanId, bcast.Data, expire)
213
// assume this for now
257
h.logger.Errorf("could not store notification: %v", err)
214
258
return ErrCouldNotStoreNotification
221
265
func (h *BroadcastHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
222
body, apiErr := readBody(request)
225
respondError(writer, apiErr)
269
RespondError(writer, apiErr)
273
body, apiErr := ReadBody(request, MaxRequestBodyBytes)
278
sto, apiErr := h.getStore(writer, request)
229
284
broadcast := &Broadcast{}
230
285
err := json.Unmarshal(body, broadcast)
233
respondError(writer, ErrMalformedJSONObject)
287
apiErr = ErrMalformedJSONObject
237
apiErr = h.doBroadcast(broadcast)
291
apiErr = h.doBroadcast(sto, broadcast)
238
292
if apiErr != nil {
239
respondError(writer, apiErr)
247
300
// MakeHandlersMux makes a handler that dispatches for the various API endpoints.
248
func MakeHandlersMux(store store.PendingStore, broker broker.BrokerSending, logger logger.Logger) http.Handler {
301
func MakeHandlersMux(storeForRequest StoreForRequest, broker broker.BrokerSending, logger logger.Logger) *http.ServeMux {
303
storeForRequest: storeForRequest,
249
307
mux := http.NewServeMux()
250
mux.Handle("/broadcast", &BroadcastHandler{
308
mux.Handle("/broadcast", &BroadcastHandler{context: ctx})