~ubuntu-branches/ubuntu/vivid/ctdb/vivid-proposed

« back to all changes in this revision

Viewing changes to common/ctdb_io.c

  • Committer: Package Import Robot
  • Author(s): Mathieu Parent
  • Date: 2011-11-06 15:18:59 UTC
  • mto: (1.3.1)
  • mto: This revision was merged to the branch mainline in revision 22.
  • Revision ID: package-import@ubuntu.com-20111106151859-84nk51h3enndlo4q
Tags: upstream-1.11+git20111102
ImportĀ upstreamĀ versionĀ 1.11+git20111102

Show diffs side-by-side

added added

removed removed

Lines of Context:
81
81
 
82
82
/*
83
83
  called when an incoming connection is readable
 
84
  This function MUST be safe for reentry via the queue callback!
84
85
*/
85
86
static void queue_io_read(struct ctdb_queue *queue)
86
87
{
87
88
        int num_ready = 0;
88
 
        ssize_t nread, totread, partlen;
89
 
        uint8_t *data, *data_base;
 
89
        uint32_t sz_bytes_req;
 
90
        uint32_t pkt_size;
 
91
        uint32_t pkt_bytes_remaining;
 
92
        uint32_t to_read;
 
93
        ssize_t nread;
 
94
        uint8_t *data;
90
95
 
91
96
        if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
92
97
                return;
96
101
                goto failed;
97
102
        }
98
103
 
99
 
 
100
 
        queue->partial.data = talloc_realloc_size(queue, queue->partial.data, 
101
 
                                                  num_ready + queue->partial.length);
102
 
 
103
104
        if (queue->partial.data == NULL) {
104
 
                DEBUG(DEBUG_ERR,("%s: read error alloc failed for %u\n",
105
 
                        queue->name, num_ready + queue->partial.length));
 
105
                /* starting fresh, allocate buf for size bytes */
 
106
                sz_bytes_req = sizeof(pkt_size);
 
107
                queue->partial.data = talloc_size(queue, sz_bytes_req);
 
108
                if (queue->partial.data == NULL) {
 
109
                        DEBUG(DEBUG_ERR,("read error alloc failed for %u\n",
 
110
                                         sz_bytes_req));
 
111
                        goto failed;
 
112
                }
 
113
        } else if (queue->partial.length < sizeof(pkt_size)) {
 
114
                /* yet to find out the packet length */
 
115
                sz_bytes_req = sizeof(pkt_size) - queue->partial.length;
 
116
        } else {
 
117
                /* partial packet, length known, full buf allocated */
 
118
                sz_bytes_req = 0;
 
119
        }
 
120
        data = queue->partial.data;
 
121
 
 
122
        if (sz_bytes_req > 0) {
 
123
                to_read = MIN(sz_bytes_req, num_ready);
 
124
                nread = read(queue->fd, data + queue->partial.length,
 
125
                             to_read);
 
126
                if (nread <= 0) {
 
127
                        DEBUG(DEBUG_ERR,("read error nread=%d\n", (int)nread));
 
128
                        goto failed;
 
129
                }
 
130
                queue->partial.length += nread;
 
131
 
 
132
                if (nread < sz_bytes_req) {
 
133
                        /* not enough to know the length */
 
134
                        DEBUG(DEBUG_DEBUG,("Partial packet length read\n"));
 
135
                        return;
 
136
                }
 
137
                /* size now known, allocate buffer for the full packet */
 
138
                queue->partial.data = talloc_realloc_size(queue, data,
 
139
                                                          *(uint32_t *)data);
 
140
                if (queue->partial.data == NULL) {
 
141
                        DEBUG(DEBUG_ERR,("read error alloc failed for %u\n",
 
142
                                         *(uint32_t *)data));
 
143
                        goto failed;
 
144
                }
 
145
                data = queue->partial.data;
 
146
                num_ready -= nread;
 
147
        }
 
148
 
 
149
        pkt_size = *(uint32_t *)data;
 
150
        if (pkt_size == 0) {
 
151
                DEBUG(DEBUG_CRIT,("Invalid packet of length 0\n"));
106
152
                goto failed;
107
153
        }
108
154
 
109
 
        nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
 
155
        pkt_bytes_remaining = pkt_size - queue->partial.length;
 
156
        to_read = MIN(pkt_bytes_remaining, num_ready);
 
157
        nread = read(queue->fd, data + queue->partial.length,
 
158
                     to_read);
110
159
        if (nread <= 0) {
111
 
                DEBUG(DEBUG_ERR,("%s: read error nread=%d\n",
112
 
                                 queue->name, (int)nread));
 
160
                DEBUG(DEBUG_ERR,("read error nread=%d\n",
 
161
                                 (int)nread));
113
162
                goto failed;
114
163
        }
115
 
        totread = nread;
116
 
        partlen = queue->partial.length;
 
164
        queue->partial.length += nread;
117
165
 
118
 
        data = queue->partial.data;
119
 
        nread += queue->partial.length;
 
166
        if (queue->partial.length < pkt_size) {
 
167
                DEBUG(DEBUG_DEBUG,("Partial packet data read\n"));
 
168
                return;
 
169
        }
120
170
 
121
171
        queue->partial.data = NULL;
122
172
        queue->partial.length = 0;
123
 
 
124
 
        if (nread >= 4 && *(uint32_t *)data == nread) {
125
 
                /* it is the responsibility of the incoming packet
126
 
                 function to free 'data' */
127
 
                queue->callback(data, nread, queue->private_data);
128
 
                return;
129
 
        }
130
 
 
131
 
        data_base = data;
132
 
 
133
 
        while (nread >= 4 && *(uint32_t *)data <= nread) {
134
 
                /* we have at least one packet */
135
 
                uint8_t *d2;
136
 
                uint32_t len;
137
 
                bool destroyed = false;
138
 
 
139
 
                len = *(uint32_t *)data;
140
 
                if (len == 0) {
141
 
                        /* bad packet! treat as EOF */
142
 
                        DEBUG(DEBUG_CRIT,("%s: Invalid packet of length 0 (nread = %zu, totread = %zu, partlen = %zu)\n",
143
 
                                          queue->name, nread, totread, partlen));
144
 
                        dump_packet(data_base, totread + partlen);
145
 
                        goto failed;
146
 
                }
147
 
                d2 = talloc_memdup(queue, data, len);
148
 
                if (d2 == NULL) {
149
 
                        DEBUG(DEBUG_ERR,("%s: read error memdup failed for %u\n",
150
 
                                         queue->name, len));
151
 
                        /* sigh */
152
 
                        goto failed;
153
 
                }
154
 
 
155
 
                queue->destroyed = &destroyed;
156
 
                queue->callback(d2, len, queue->private_data);
157
 
                /* If callback freed us, don't do anything else. */
158
 
                if (destroyed) {
159
 
                        return;
160
 
                }
161
 
                queue->destroyed = NULL;
162
 
 
163
 
                data += len;
164
 
                nread -= len;           
165
 
        }
166
 
 
167
 
        if (nread > 0) {
168
 
                /* we have only part of a packet */
169
 
                if (data_base == data) {
170
 
                        queue->partial.data = data;
171
 
                        queue->partial.length = nread;
172
 
                } else {
173
 
                        queue->partial.data = talloc_memdup(queue, data, nread);
174
 
                        if (queue->partial.data == NULL) {
175
 
                                DEBUG(DEBUG_ERR,("%s: read error memdup partial failed for %u\n",
176
 
                                                 queue->name, (unsigned)nread));
177
 
                                goto failed;
178
 
                        }
179
 
                        queue->partial.length = nread;
180
 
                        talloc_free(data_base);
181
 
                }
182
 
                return;
183
 
        }
184
 
 
185
 
        talloc_free(data_base);
 
173
        /* it is the responsibility of the callback to free 'data' */
 
174
        queue->callback(data, pkt_size, queue->private_data);
186
175
        return;
187
176
 
188
177
failed: