2
* Copyright (C) 2014 Daniel Vrátil <dvratil@redhat.com>
4
* This library is free software; you can redistribute it and/or
5
* modify it under the terms of the GNU Lesser General Public
6
* License as published by the Free Software Foundation; either
7
* version 2.1 of the License, or (at your option) any later version.
9
* This library is distributed in the hope that it will be useful,
10
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12
* Lesser General Public License for more details.
14
* You should have received a copy of the GNU Lesser General Public
15
* License along with this library; if not, write to the Free Software
16
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20
#include "partstreamer.h"
21
#include "parthelper.h"
22
#include "parttypehelper.h"
23
#include "selectquerybuilder.h"
25
#include "connection.h"
26
#include "capabilities_p.h"
27
#include "imapstreamparser.h"
30
#include <libs/imapparser_p.h>
31
#include <libs/protocol_p.h>
32
#include <shared/akstandarddirs.h>
39
using namespace Akonadi;
40
using namespace Akonadi::Server;
42
PartStreamer::PartStreamer(Connection *connection, ImapStreamParser *parser,
43
const PimItem &pimItem, QObject *parent)
45
, mConnection(connection)
46
, mStreamParser(parser)
49
// Make sure the file_db_data path exists
50
AkStandardDirs::saveDir( "data", QLatin1String( "file_db_data" ) );
53
PartStreamer::~PartStreamer()
57
QByteArray PartStreamer::error() const
62
bool PartStreamer::streamNonliteral(Part& part, qint64& partSize, QByteArray& value)
64
value = mStreamParser->readString();
65
if (part.partType().ns() == QLatin1String("PLD")) {
66
partSize = value.size();
69
mDataChanged = (value.size() != part.datasize());
72
// only relevant for non-literals or non-external literals
73
// only fallback to data comparision if part already exists and sizes match
75
mDataChanged = (value != PartHelper::translateData(part));
80
PartHelper::update(&part, value, value.size());
82
// akDebug() << "insert from Store::handleLine: " << value.left(100);
84
part.setDatasize(value.size());
85
if (!PartHelper::insert(&part)) {
86
mError = "Unable to add item part";
95
bool PartStreamer::streamLiteral(Part& part, qint64& partSize, QByteArray& value)
97
const qint64 dataSize = mStreamParser->remainingLiteralSize();
98
if (part.partType().ns() == QLatin1String("PLD")) {
100
// Shortcut: if sizes differ, we don't need to compare data later no in order
101
// to detect whether the part has changed
103
mDataChanged = (partSize != part.datasize());
107
//actual case when streaming storage is used: external payload is enabled, data is big enough in a literal
108
const bool storeInFile = dataSize > DbConfig::configuredDatabase()->sizeThreshold();
110
return streamLiteralToFile(dataSize, part, value);
112
mStreamParser->sendContinuationResponse(dataSize);
113
//don't write in streaming way as the data goes to the database
114
while (!mStreamParser->atLiteralEnd()) {
115
value += mStreamParser->readLiteralPart();
117
if (part.isValid()) {
118
PartHelper::update(&part, value, value.size());
121
part.setDatasize(value.size());
122
if (!part.insert()) {
123
mError = "Failed to insert part to database";
133
bool PartStreamer::streamLiteralToFile(qint64 dataSize, Part &part, QByteArray &value)
135
const bool directStreaming = mConnection->capabilities().directStreaming();
138
if (!mDataChanged && mCheckChanged) {
139
origData = PartHelper::translateData(part);
142
if (directStreaming) {
143
bool ok = streamLiteralToFileDirectly(dataSize, part);
148
mStreamParser->sendContinuationResponse(dataSize);
150
// use first part as value for the initial insert into / update to the database.
151
// this will give us a proper filename to stream the rest of the parts contents into
152
// NOTE: we have to set the correct size (== dataSize) directly
153
value = mStreamParser->readLiteralPart();
154
// akDebug() << Q_FUNC_INFO << "VALUE in STORE: " << value << value.size() << dataSize;
156
if (part.isValid()) {
157
PartHelper::update(&part, value, dataSize);
159
part.setData( value );
160
part.setDatasize( dataSize );
161
if ( !PartHelper::insert( &part ) ) {
162
mError = "Unable to add item part";
167
//the actual streaming code for the remaining parts:
168
// reads from the parser, writes immediately to the file
169
QFile partFile(PartHelper::resolveAbsolutePath(part.data()));
171
PartHelper::streamToFile(mStreamParser, partFile, QIODevice::WriteOnly | QIODevice::Append);
172
} catch (const PartHelperException &e) {
178
if (mCheckChanged && !mDataChanged) {
179
// This is invoked only when part already exists, data sizes match and
180
// caller wants to know whether parts really differ
181
mDataChanged = (origData != PartHelper::translateData(part));
187
bool PartStreamer::streamLiteralToFileDirectly(qint64 dataSize, Part &part)
190
if (part.isValid()) {
191
if (part.external()) {
192
// Part was external and is still external
193
filename = QString::fromLatin1(part.data());
195
// Part wasn't external, but is now
196
filename = PartHelper::fileNameForPart(&part);
198
filename = PartHelper::updateFileNameRevision(filename);
202
QFileInfo finfo(filename);
203
if (finfo.isAbsolute()) {
204
filename = finfo.fileName();
207
part.setExternal(true);
208
part.setDatasize(dataSize);
209
part.setData(filename.toLatin1());
211
if (part.isValid()) {
212
if (!part.update()) {
213
mError = "Failed to update part in database";
217
if (!part.insert()) {
218
mError = "Failed to insert part into database";
222
filename = PartHelper::updateFileNameRevision(PartHelper::fileNameForPart(&part));
223
part.setData(filename.toLatin1());
228
response.setContinuation();
229
response.setString("STREAM [FILE " + part.data() + "]");
230
Q_EMIT responseAvailable(response);
232
const QByteArray reply = mStreamParser->peekString();
233
if (reply.startsWith("NO")) {
234
akError() << "Client failed to store payload into file";
235
mError = "Client failed to store payload into file";
239
QFile file(PartHelper::resolveAbsolutePath(filename.toLatin1()), this);
240
if (!file.exists()) {
241
akError() << "External payload file does not exist";
242
mError = "External payload file does not exist";
246
if (file.size() != dataSize) {
247
akError() << "Size mismatch! Client advertised" << dataSize << "bytes, but the file is" << file.size() << "bytes!";
248
mError = "Payload size mismatch";
256
bool PartStreamer::stream(const QByteArray &command, bool checkExists,
257
QByteArray &partName, qint64 &partSize, bool *changed)
262
mDataChanged = false;
263
mCheckChanged = (changed != 0);
268
ImapParser::splitVersionedKey(command, partName, partVersion);
269
const PartType partType = PartTypeHelper::fromFqName( partName );
273
if (checkExists || mCheckChanged) {
274
SelectQueryBuilder<Part> qb;
275
qb.addValueCondition(Part::pimItemIdColumn(), Query::Equals, mItem.id());
276
qb.addValueCondition(Part::partTypeIdColumn(), Query::Equals, partType.id());
278
mError = "Unable to check item part existence";
282
Part::List result = qb.result();
283
if (!result.isEmpty()) {
284
part = result.first();
288
// Shortcut: newly created parts are always "changed"
289
if (!part.isValid()) {
293
part.setPartType(partType);
294
part.setVersion(partVersion);
295
part.setPimItemId(mItem.id());
299
if (mStreamParser->hasLiteral(false)) {
300
ok = streamLiteral(part, partSize, value);
302
ok = streamNonliteral(part, partSize, value);
305
if (ok && mCheckChanged) {
306
*changed = mDataChanged;