~xavi-garcia-mena/keeper/owncloud-test

« back to all changes in this revision

Viewing changes to src/helper/restore-helper.cpp

  • Committer: Xavi Garcia Mena
  • Date: 2016-11-17 12:59:44 UTC
  • Revision ID: xavi.garcia.mena@canonical.com-20161117125944-xt47ulvg2lscwpx6
Changed the way we create the socket between keeper and restore helper

Show diffs side-by-side

added added

removed removed

Lines of Context:
31
31
#include <QTimer>
32
32
#include <QVector>
33
33
#include <QCryptographicHash>
 
34
#include <QLocalServer>
 
35
#include <QSharedPointer>
34
36
 
35
37
#include <fcntl.h>
36
38
#include <sys/types.h>
47
49
        RestoreHelper* backup_helper
48
50
    )
49
51
        : q_ptr(backup_helper)
 
52
        , server_(new QLocalServer)
50
53
    {
51
54
        // listen for inactivity from storage framework
52
55
        QObject::connect(&timer_, &QTimer::timeout,
53
56
            std::bind(&RestoreHelperPrivate::on_inactivity_detected, this)
54
57
        );
55
58
 
 
59
        QObject::connect(&timer2_, &QTimer::timeout,
 
60
            std::bind(&RestoreHelperPrivate::delay_data, this)
 
61
        );
 
62
 
56
63
        // fire up the sockets
57
64
        int fds[2];
58
65
        int rc = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, fds);
64
71
        }
65
72
 
66
73
        // helper socket is for the client.
67
 
        helper_socket_.setSocketDescriptor(fds[0], QLocalSocket::ConnectedState, QIODevice::ReadOnly);
68
 
 
69
 
        write_socket_.setSocketDescriptor(fds[1], QLocalSocket::ConnectedState, QIODevice::WriteOnly);
 
74
        helper_socket_.setSocketDescriptor(fds[1], QLocalSocket::ConnectedState, QIODevice::ReadOnly);
 
75
 
 
76
        write_socket_.setSocketDescriptor(fds[0], QLocalSocket::ConnectedState, QIODevice::WriteOnly);
 
77
 
 
78
 
 
79
        QObject::connect(server_.data(), &QLocalServer::newConnection, std::bind(&RestoreHelperPrivate::restore_ready, this));
 
80
 
 
81
        if (!server_->listen("test_helper"))
 
82
        {
 
83
            qWarning() << " GREP ERROR LISTENING";
 
84
        }
70
85
    }
71
86
 
72
87
    ~RestoreHelperPrivate() = default;
110
125
 
111
126
    void stop()
112
127
    {
 
128
        qDebug() << "GREP DISCONNECTING";
113
129
        write_socket_.disconnectFromServer();
 
130
        helper_conn_->disconnectFromServer();
114
131
        cancelled_ = true;
115
132
        q_ptr->Helper::stop();
116
133
    }
140
157
            case Helper::State::DATA_COMPLETE: {
141
158
                qDebug() << "Restore helper finished, calling downloader_.finish()";
142
159
                write_socket_.disconnectFromServer();
 
160
                helper_conn_->disconnectFromServer();
143
161
                downloader_->finish();
144
162
                downloader_.reset();
145
163
                break;
158
176
        check_for_done();
159
177
    }
160
178
 
 
179
    void restore_ready()
 
180
    {
 
181
        qDebug() << "GREP (((((((((((((((((((((((((((((((((((( HELPER IS READY: Starts sending data: TOTAL TO SEND: " << upload_buffer_.size();
 
182
        helper_is_ready_ = true;
 
183
 
 
184
        helper_conn_.reset(server_->nextPendingConnection());
 
185
 
 
186
        connections_.remember(QObject::connect(
 
187
            helper_conn_.data(), &QLocalSocket::bytesWritten,
 
188
            std::bind(&RestoreHelperPrivate::on_data_uploaded, this, std::placeholders::_1)
 
189
        ));
 
190
//        timer2_.start(10);
 
191
        send_data();
 
192
    }
 
193
 
161
194
private:
162
195
 
163
196
    void on_inactivity_detected()
167
200
        stop();
168
201
    }
169
202
 
 
203
    void send_data()
 
204
    {
 
205
        if (upload_buffer_.size())
 
206
        {
 
207
            int bytes_to_send = upload_buffer_.size() < UPLOAD_BUFFER_MAX_ ? upload_buffer_.size() : UPLOAD_BUFFER_MAX_;
 
208
            // try to empty the upload buf
 
209
            const auto n = helper_conn_->write(upload_buffer_, bytes_to_send);
 
210
            if (n > 0) {
 
211
                // THIS IS JUST FOR EXTRA DEBUG INFORMATION
 
212
                QCryptographicHash hash(QCryptographicHash::Sha1);
 
213
                hash.addData(upload_buffer_.left(100));
 
214
//                qDebug() << "GREP ************************************************ Hash send: " << hash.result().toHex() << " Size: " << n << " Total: " << upload_buffer_.size();
 
215
//                // THIS IS JUST FOR EXTRA DEBUG INFORMATION
 
216
                upload_buffer_.remove(0, int(n));
 
217
                qDebug() << "GREP ************************************************ Hash send: " << hash.result().toHex() << " Size: " << n << " Total: " << upload_buffer_.size();
 
218
                                // THIS IS JUST FOR EXTRA DEBUG INFORMATION
 
219
                waiting_for_data_to_be_written_ = true;
 
220
            }
 
221
            else {
 
222
                if (n < 0) {
 
223
                    write_error_ = true;
 
224
                    qWarning() << "Write error:" << write_socket_.errorString();
 
225
                    stop();
 
226
                }
 
227
            }
 
228
//            helper_conn_->flush();
 
229
//            timer2_.start(10);
 
230
        }
 
231
    }
 
232
 
170
233
    void on_ready_read()
171
234
    {
172
235
        process_more();
174
237
 
175
238
    void on_data_uploaded(qint64 n)
176
239
    {
 
240
        waiting_for_data_to_be_written_ = false;
177
241
        n_uploaded_ += n;
178
242
        q_ptr->record_data_transferred(n);
179
 
        qDebug("n_read %zu n_uploaded %zu (newly uploaded %zu)", size_t(n_read_), size_t(n_uploaded_), size_t(n));
180
 
        process_more();
 
243
 
 
244
        qDebug() << "GREP ====================================== Bytes uploaded: " << n << "Total: " << n_uploaded_;
 
245
        send_data();
 
246
//        qDebug("n_read %zu n_uploaded %zu (newly uploaded %zu)", size_t(n_read_), size_t(n_uploaded_), size_t(n));
 
247
//        process_more();
 
248
        check_for_done();
 
249
    }
 
250
 
 
251
    void delay_data()
 
252
    {
 
253
        send_data();
 
254
        //        qDebug("n_read %zu n_uploaded %zu (newly uploaded %zu)", size_t(n_read_), size_t(n_uploaded_), size_t(n));
 
255
        //        process_more();
181
256
        check_for_done();
182
257
    }
183
258
 
184
259
    void process_more()
185
260
    {
186
 
        qDebug() << "RestoreHelper::process_more()";
187
261
        if (!downloader_)
188
262
            return;
189
 
        qDebug() << "RestoreHelper::process_more() 2";
190
263
        char readbuf[UPLOAD_BUFFER_MAX_];
191
264
        auto socket = downloader_->socket();
192
265
        for(;;)
194
267
            if (!socket->bytesAvailable())
195
268
                break;
196
269
            // try to fill the upload buf
197
 
            int max_bytes = (UPLOAD_BUFFER_MAX_) - upload_buffer_.size();
 
270
//            int max_bytes = UPLOAD_BUFFER_MAX_ - upload_buffer_.size();
 
271
            int max_bytes = UPLOAD_BUFFER_MAX_;
198
272
            if (max_bytes > 0) {
199
273
                const auto n = socket->read(readbuf, max_bytes);
200
274
                if (n > 0) {
201
275
                    n_read_ += n;
202
276
                    upload_buffer_.append(readbuf, int(n));
 
277
                    QCryptographicHash hash(QCryptographicHash::Sha1);
 
278
                    hash.addData(readbuf, 100);
 
279
                    qDebug() << "GREP ########################### READ BYTES: " << n << "Total: " << n_read_ << " Hash: " << hash.result().toHex();
203
280
                    qDebug("upload_buffer_.size() is %zu after reading %zu from helper", size_t(upload_buffer_.size()), size_t(n));
204
281
                }
205
282
                else if (n < 0) {
210
287
                }
211
288
            }
212
289
 
213
 
            // THIS IS JUST FOR EXTRA DEBUG INFORMATION
214
 
            QCryptographicHash hash(QCryptographicHash::Sha1);
215
 
            hash.addData(upload_buffer_.left(100));
216
 
            qDebug() << "************************************************ Hash send: " << hash.result().toHex() << " Size: " << upload_buffer_.size() << " Total: " << n_read_;
217
 
            // THIS IS JUST FOR EXTRA DEBUG INFORMATION
 
290
            if (helper_is_ready_ && ! waiting_for_data_to_be_written_)
 
291
            {
 
292
                send_data();
 
293
            }
218
294
 
219
 
            // try to empty the upload buf
220
 
            const auto n = write_socket_.write(upload_buffer_);
221
 
            if (n > 0) {
222
 
                upload_buffer_.remove(0, int(n));
223
 
                qDebug("upload_buffer_.size() is %zu after writing %zu to cloud", size_t(upload_buffer_.size()), size_t(n));
224
 
                continue;
225
 
            }
226
 
            else {
227
 
                if (n < 0) {
228
 
                    write_error_ = true;
229
 
                    qWarning() << "Write error:" << write_socket_.errorString();
230
 
                    stop();
231
 
                }
232
 
                break;
233
 
            }
 
295
//            // THIS IS JUST FOR EXTRA DEBUG INFORMATION
 
296
//            QCryptographicHash hash(QCryptographicHash::Sha1);
 
297
//            hash.addData(upload_buffer_.left(100));
 
298
//            qDebug() << "************************************************ Hash send: " << hash.result().toHex() << " Size: " << upload_buffer_.size() << " Total: " << n_read_;
 
299
//            // THIS IS JUST FOR EXTRA DEBUG INFORMATION
 
300
//
 
301
//            // try to empty the upload buf
 
302
//            const auto n = write_socket_.write(upload_buffer_);
 
303
//            if (n > 0) {
 
304
//                upload_buffer_.remove(0, int(n));
 
305
//                qDebug("upload_buffer_.size() is %zu after writing %zu to cloud", size_t(upload_buffer_.size()), size_t(n));
 
306
//                continue;
 
307
//            }
 
308
//            else {
 
309
//                if (n < 0) {
 
310
//                    write_error_ = true;
 
311
//                    qWarning() << "Write error:" << write_socket_.errorString();
 
312
//                    stop();
 
313
//                }
 
314
//                break;
 
315
//            }
234
316
        }
235
317
 
236
318
        reset_inactivity_timer();
250
332
    void check_for_done()
251
333
    {
252
334
        qDebug() << "Checking for done.";
 
335
        qDebug() << "Expected: " << q_ptr->expected_size() << " Uploaded: " << n_uploaded_ << " Read: " << n_read_;
253
336
        if (cancelled_)
254
337
        {
255
338
            q_ptr->set_state(Helper::State::CANCELLED);
286
369
 
287
370
    RestoreHelper * const q_ptr;
288
371
    QTimer timer_;
 
372
    QTimer timer2_;
289
373
    std::shared_ptr<Downloader> downloader_;
290
374
    QLocalSocket helper_socket_;
291
375
    QLocalSocket write_socket_;
297
381
    bool cancelled_ = false;
298
382
    ConnectionHelper connections_;
299
383
    QString uploader_committed_file_name_;
 
384
 
 
385
    bool helper_is_ready_ = false;
 
386
    bool waiting_for_data_to_be_written_ = false;
 
387
 
 
388
    QSharedPointer<QLocalServer> server_;
 
389
    QSharedPointer<QLocalSocket> helper_conn_;
300
390
};
301
391
 
302
392
/***
372
462
    Helper::on_helper_finished();
373
463
    d->on_helper_finished();
374
464
}
 
465
 
 
466
void RestoreHelper::restore_ready()
 
467
{
 
468
    Q_D(RestoreHelper);
 
469
 
 
470
    d->restore_ready();
 
471
}