~ubuntu-branches/ubuntu/quantal/zfs-fuse/quantal

« back to all changes in this revision

Viewing changes to src/lib/libzpool/zil.c

  • Committer: Bazaar Package Importer
  • Author(s): Mike Hommey, Mike Hommey, Seth Heeren
  • Date: 2010-06-30 18:03:52 UTC
  • mfrom: (1.1.3 upstream)
  • Revision ID: james.westby@ubuntu.com-20100630180352-d3jq25ytbcl23q3y
Tags: 0.6.9-1
* New upstream release.

[ Mike Hommey ]
* debian/control:
  - Build depend on libssl-dev and libattr1-dev, now required to build.
  - Build depend on docbook-xml to avoid xsltproc I/O error loading
    docbook DTD.
  - Add suggestions for a NFS server and kpartx.
* debian/man/*, debian/copyright, debian/rules: Remove manual pages, they
  are now shipped upstream.
* debian/copyright: Change download link.
* src/SConstruct:
  - Add an optim option to the build system.
  - Add support for DESTDIR.
  - Force debug=1 to mean optim, no strip, no debug.
  - Use -ffunction-sections, -fdata-sections, and --gc-sections flags to
    reduce the binary sizes.
* src/lib/libumem/SConscript: Cleanup src/lib/libumem when cleaning up
  build directory.
* src/cmd/*/SConscript: Don't link zfs, zpool and zdb against libssl.
* src/lib/libumem/SConscript: Only build static libumem.
* src/lib/libumem/sol_compat.h:
  - Add atomic cas support for sparc.
  - Use atomic functions from libsolcompat in libumem on unsupported
    platforms.
* debian/rules:
  - Set optimization level in build system according to DEB_BUILD_OPTIONS.
  - Build with debug=1 to have unstripped binaries ; dh_strip will do the
    right thing.
  - Don't depend on the local location of the docbook XSLT stylesheets.
    Use the catalogged url in place of the full path.
  - Don't clean src/.sconsign.dblite and src/path.pyc.
  - Set all destination directories when installing with scons.
  - Install bash completion and zfsrc files.
  - Don't use scons cache when building.
* debian/prerm: Remove /var/lib/zfs/zpool.cache in prerm.
* debian/dirs: Create /etc/bash_completion.d.
* debian/watch: Fix watch file.
* debian/rules, debian/control, debian/compat: Switch to dh.
* debian/README.Debian: Update README.Debian.
* debian/zfs-fuse.man.xml: Update zfs-fuse manual page.
* debian/zfs-fuse.init: Start sharing datasets marked as such at daemon
  startup.
* debian/rules, debian/control: Use config.guess and config.sub from
  autotools-dev.

[ Seth Heeren ]
* debian/zfs-fuse.man.xml:
  Added notes on the precedence, zfsrc, commandline, initscript vs.
  /etc/default/zfs-fuse on some systems.
* debian/zfs-fuse.init, debian/zfs-fuse.default: Deprecating DAEMON_OPTS.
* debian/zfs-fuse.init:
  - Removing import -a -f.
  - Removing the now unnecessary 'sleep 2'.
  - Extended shutdown wait to allow for zfs-fuse daemon's own shutdown
    timeouts.
  - Re-ordered dubious PATH setting.
* debian/zfs-fuse.init: Move existing zpool.cache to new location if
  possible.

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
 * CDDL HEADER END
20
20
 */
21
21
/*
22
 
 * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
 
22
 * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
23
23
 * Use is subject to license terms.
24
24
 */
25
25
 
26
26
#include <sys/zfs_context.h>
27
27
#include <sys/spa.h>
28
 
#include <sys/spa_impl.h>
29
28
#include <sys/dmu.h>
30
29
#include <sys/zap.h>
31
30
#include <sys/arc.h>
77
76
 
78
77
static kmem_cache_t *zil_lwb_cache;
79
78
 
 
79
static boolean_t zil_empty(zilog_t *zilog);
 
80
 
 
81
#define LWB_EMPTY(lwb) ((BP_GET_LSIZE(&lwb->lwb_blk) - \
 
82
    sizeof (zil_chain_t)) == (lwb->lwb_sz - lwb->lwb_nused))
 
83
 
 
84
 
80
85
static int
81
 
zil_dva_compare(const void *x1, const void *x2)
 
86
zil_bp_compare(const void *x1, const void *x2)
82
87
{
83
 
        const dva_t *dva1 = x1;
84
 
        const dva_t *dva2 = x2;
 
88
        const dva_t *dva1 = &((zil_bp_node_t *)x1)->zn_dva;
 
89
        const dva_t *dva2 = &((zil_bp_node_t *)x2)->zn_dva;
85
90
 
86
91
        if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
87
92
                return (-1);
97
102
}
98
103
 
99
104
static void
100
 
zil_dva_tree_init(avl_tree_t *t)
 
105
zil_bp_tree_init(zilog_t *zilog)
101
106
{
102
 
        avl_create(t, zil_dva_compare, sizeof (zil_dva_node_t),
103
 
            offsetof(zil_dva_node_t, zn_node));
 
107
        avl_create(&zilog->zl_bp_tree, zil_bp_compare,
 
108
            sizeof (zil_bp_node_t), offsetof(zil_bp_node_t, zn_node));
104
109
}
105
110
 
106
111
static void
107
 
zil_dva_tree_fini(avl_tree_t *t)
 
112
zil_bp_tree_fini(zilog_t *zilog)
108
113
{
109
 
        zil_dva_node_t *zn;
 
114
        avl_tree_t *t = &zilog->zl_bp_tree;
 
115
        zil_bp_node_t *zn;
110
116
        void *cookie = NULL;
111
117
 
112
118
        while ((zn = avl_destroy_nodes(t, &cookie)) != NULL)
113
 
                kmem_free(zn, sizeof (zil_dva_node_t));
 
119
                kmem_free(zn, sizeof (zil_bp_node_t));
114
120
 
115
121
        avl_destroy(t);
116
122
}
117
123
 
118
 
static int
119
 
zil_dva_tree_add(avl_tree_t *t, dva_t *dva)
 
124
int
 
125
zil_bp_tree_add(zilog_t *zilog, const blkptr_t *bp)
120
126
{
121
 
        zil_dva_node_t *zn;
 
127
        avl_tree_t *t = &zilog->zl_bp_tree;
 
128
        const dva_t *dva = BP_IDENTITY(bp);
 
129
        zil_bp_node_t *zn;
122
130
        avl_index_t where;
123
131
 
124
132
        if (avl_find(t, dva, &where) != NULL)
125
133
                return (EEXIST);
126
134
 
127
 
        zn = kmem_alloc(sizeof (zil_dva_node_t), KM_SLEEP);
 
135
        zn = kmem_alloc(sizeof (zil_bp_node_t), KM_SLEEP);
128
136
        zn->zn_dva = *dva;
129
137
        avl_insert(t, zn, where);
130
138
 
149
157
}
150
158
 
151
159
/*
152
 
 * Read a log block, make sure it's valid, and byteswap it if necessary.
 
160
 * Read a log block and make sure it's valid.
153
161
 */
154
162
static int
155
 
zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, arc_buf_t **abufpp)
 
163
zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
 
164
    char **end)
156
165
{
157
 
        blkptr_t blk = *bp;
 
166
        enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
 
167
        uint32_t aflags = ARC_WAIT;
 
168
        arc_buf_t *abuf = NULL;
158
169
        zbookmark_t zb;
159
 
        uint32_t aflags = ARC_WAIT;
160
170
        int error;
161
171
 
162
 
        zb.zb_objset = bp->blk_cksum.zc_word[ZIL_ZC_OBJSET];
163
 
        zb.zb_object = 0;
164
 
        zb.zb_level = -1;
165
 
        zb.zb_blkid = bp->blk_cksum.zc_word[ZIL_ZC_SEQ];
166
 
 
167
 
        *abufpp = NULL;
168
 
 
169
 
        /*
170
 
         * We shouldn't be doing any scrubbing while we're doing log
171
 
         * replay, it's OK to not lock.
172
 
         */
173
 
        error = arc_read_nolock(NULL, zilog->zl_spa, &blk,
174
 
            arc_getbuf_func, abufpp, ZIO_PRIORITY_SYNC_READ, ZIO_FLAG_CANFAIL |
175
 
            ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB, &aflags, &zb);
 
172
        if (zilog->zl_header->zh_claim_txg == 0)
 
173
                zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
 
174
 
 
175
        if (!(zilog->zl_header->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
 
176
                zio_flags |= ZIO_FLAG_SPECULATIVE;
 
177
 
 
178
        SET_BOOKMARK(&zb, bp->blk_cksum.zc_word[ZIL_ZC_OBJSET],
 
179
            ZB_ZIL_OBJECT, ZB_ZIL_LEVEL, bp->blk_cksum.zc_word[ZIL_ZC_SEQ]);
 
180
 
 
181
        error = arc_read_nolock(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
 
182
            ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
176
183
 
177
184
        if (error == 0) {
178
 
                char *data = (*abufpp)->b_data;
179
 
                uint64_t blksz = BP_GET_LSIZE(bp);
180
 
                zil_trailer_t *ztp = (zil_trailer_t *)(data + blksz) - 1;
181
185
                zio_cksum_t cksum = bp->blk_cksum;
182
186
 
183
187
                /*
190
194
                 */
191
195
                cksum.zc_word[ZIL_ZC_SEQ]++;
192
196
 
193
 
                if (bcmp(&cksum, &ztp->zit_next_blk.blk_cksum,
194
 
                    sizeof (cksum)) || BP_IS_HOLE(&ztp->zit_next_blk) ||
195
 
                    (ztp->zit_nused > (blksz - sizeof (zil_trailer_t)))) {
196
 
                        error = ECKSUM;
197
 
                }
198
 
 
199
 
                if (error) {
200
 
                        VERIFY(arc_buf_remove_ref(*abufpp, abufpp) == 1);
201
 
                        *abufpp = NULL;
202
 
                }
203
 
        }
204
 
 
205
 
        dprintf("error %d on %llu:%llu\n", error, zb.zb_objset, zb.zb_blkid);
 
197
                if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
 
198
                        zil_chain_t *zilc = abuf->b_data;
 
199
                        char *lr = (char *)(zilc + 1);
 
200
                        uint64_t len = zilc->zc_nused - sizeof (zil_chain_t);
 
201
 
 
202
                        if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
 
203
                            sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk)) {
 
204
                                error = ECKSUM;
 
205
                        } else {
 
206
                                bcopy(lr, dst, len);
 
207
                                *end = (char *)dst + len;
 
208
                                *nbp = zilc->zc_next_blk;
 
209
                        }
 
210
                } else {
 
211
                        char *lr = abuf->b_data;
 
212
                        uint64_t size = BP_GET_LSIZE(bp);
 
213
                        zil_chain_t *zilc = (zil_chain_t *)(lr + size) - 1;
 
214
 
 
215
                        if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
 
216
                            sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk) ||
 
217
                            (zilc->zc_nused > (size - sizeof (*zilc)))) {
 
218
                                error = ECKSUM;
 
219
                        } else {
 
220
                                bcopy(lr, dst, zilc->zc_nused);
 
221
                                *end = (char *)dst + zilc->zc_nused;
 
222
                                *nbp = zilc->zc_next_blk;
 
223
                        }
 
224
                }
 
225
 
 
226
                VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
 
227
        }
 
228
 
 
229
        return (error);
 
230
}
 
231
 
 
232
/*
 
233
 * Read a TX_WRITE log data block.
 
234
 */
 
235
static int
 
236
zil_read_log_data(zilog_t *zilog, const lr_write_t *lr, void *wbuf)
 
237
{
 
238
        enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
 
239
        const blkptr_t *bp = &lr->lr_blkptr;
 
240
        uint32_t aflags = ARC_WAIT;
 
241
        arc_buf_t *abuf = NULL;
 
242
        zbookmark_t zb;
 
243
        int error;
 
244
 
 
245
        if (BP_IS_HOLE(bp)) {
 
246
                if (wbuf != NULL)
 
247
                        bzero(wbuf, MAX(BP_GET_LSIZE(bp), lr->lr_length));
 
248
                return (0);
 
249
        }
 
250
 
 
251
        if (zilog->zl_header->zh_claim_txg == 0)
 
252
                zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
 
253
 
 
254
        SET_BOOKMARK(&zb, dmu_objset_id(zilog->zl_os), lr->lr_foid,
 
255
            ZB_ZIL_LEVEL, lr->lr_offset / BP_GET_LSIZE(bp));
 
256
 
 
257
        error = arc_read_nolock(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
 
258
            ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
 
259
 
 
260
        if (error == 0) {
 
261
                if (wbuf != NULL)
 
262
                        bcopy(abuf->b_data, wbuf, arc_buf_size(abuf));
 
263
                (void) arc_buf_remove_ref(abuf, &abuf);
 
264
        }
206
265
 
207
266
        return (error);
208
267
}
209
268
 
210
269
/*
211
270
 * Parse the intent log, and call parse_func for each valid record within.
212
 
 * Return the highest sequence number.
213
271
 */
214
 
uint64_t
 
272
int
215
273
zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
216
274
    zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
217
275
{
218
276
        const zil_header_t *zh = zilog->zl_header;
219
 
        uint64_t claim_seq = zh->zh_claim_seq;
220
 
        uint64_t seq = 0;
221
 
        uint64_t max_seq = 0;
222
 
        blkptr_t blk = zh->zh_log;
223
 
        arc_buf_t *abuf;
 
277
        boolean_t claimed = !!zh->zh_claim_txg;
 
278
        uint64_t claim_blk_seq = claimed ? zh->zh_claim_blk_seq : UINT64_MAX;
 
279
        uint64_t claim_lr_seq = claimed ? zh->zh_claim_lr_seq : UINT64_MAX;
 
280
        uint64_t max_blk_seq = 0;
 
281
        uint64_t max_lr_seq = 0;
 
282
        uint64_t blk_count = 0;
 
283
        uint64_t lr_count = 0;
 
284
        blkptr_t blk, next_blk;
224
285
        char *lrbuf, *lrp;
225
 
        zil_trailer_t *ztp;
226
 
        int reclen, error;
 
286
        int error = 0;
227
287
 
228
 
        if (BP_IS_HOLE(&blk))
229
 
                return (max_seq);
 
288
        /*
 
289
         * Old logs didn't record the maximum zh_claim_lr_seq.
 
290
         */
 
291
        if (!(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
 
292
                claim_lr_seq = UINT64_MAX;
230
293
 
231
294
        /*
232
295
         * Starting at the block pointed to by zh_log we read the log chain.
237
300
         * If the log has been claimed, stop if we encounter a sequence
238
301
         * number greater than the highest claimed sequence number.
239
302
         */
240
 
        zil_dva_tree_init(&zilog->zl_dva_tree);
241
 
        for (;;) {
242
 
                seq = blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
243
 
 
244
 
                if (claim_seq != 0 && seq > claim_seq)
245
 
                        break;
246
 
 
247
 
                ASSERT(max_seq < seq);
248
 
                max_seq = seq;
249
 
 
250
 
                error = zil_read_log_block(zilog, &blk, &abuf);
251
 
 
252
 
                if (parse_blk_func != NULL)
253
 
                        parse_blk_func(zilog, &blk, arg, txg);
254
 
 
 
303
        lrbuf = zio_buf_alloc(SPA_MAXBLOCKSIZE);
 
304
        zil_bp_tree_init(zilog);
 
305
 
 
306
        for (blk = zh->zh_log; !BP_IS_HOLE(&blk); blk = next_blk) {
 
307
                uint64_t blk_seq = blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
 
308
                int reclen;
 
309
                char *end;
 
310
 
 
311
                if (blk_seq > claim_blk_seq)
 
312
                        break;
 
313
                if ((error = parse_blk_func(zilog, &blk, arg, txg)) != 0)
 
314
                        break;
 
315
                ASSERT3U(max_blk_seq, <, blk_seq);
 
316
                max_blk_seq = blk_seq;
 
317
                blk_count++;
 
318
 
 
319
                if (max_lr_seq == claim_lr_seq && max_blk_seq == claim_blk_seq)
 
320
                        break;
 
321
 
 
322
                error = zil_read_log_block(zilog, &blk, &next_blk, lrbuf, &end);
255
323
                if (error)
256
324
                        break;
257
325
 
258
 
                lrbuf = abuf->b_data;
259
 
                ztp = (zil_trailer_t *)(lrbuf + BP_GET_LSIZE(&blk)) - 1;
260
 
                blk = ztp->zit_next_blk;
261
 
 
262
 
                if (parse_lr_func == NULL) {
263
 
                        VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
264
 
                        continue;
265
 
                }
266
 
 
267
 
                for (lrp = lrbuf; lrp < lrbuf + ztp->zit_nused; lrp += reclen) {
 
326
                for (lrp = lrbuf; lrp < end; lrp += reclen) {
268
327
                        lr_t *lr = (lr_t *)lrp;
269
328
                        reclen = lr->lrc_reclen;
270
329
                        ASSERT3U(reclen, >=, sizeof (lr_t));
271
 
                        parse_lr_func(zilog, lr, arg, txg);
 
330
                        if (lr->lrc_seq > claim_lr_seq)
 
331
                                goto done;
 
332
                        if ((error = parse_lr_func(zilog, lr, arg, txg)) != 0)
 
333
                                goto done;
 
334
                        ASSERT3U(max_lr_seq, <, lr->lrc_seq);
 
335
                        max_lr_seq = lr->lrc_seq;
 
336
                        lr_count++;
272
337
                }
273
 
                VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
274
338
        }
275
 
        zil_dva_tree_fini(&zilog->zl_dva_tree);
276
 
 
277
 
        return (max_seq);
 
339
done:
 
340
        zilog->zl_parse_error = error;
 
341
        zilog->zl_parse_blk_seq = max_blk_seq;
 
342
        zilog->zl_parse_lr_seq = max_lr_seq;
 
343
        zilog->zl_parse_blk_count = blk_count;
 
344
        zilog->zl_parse_lr_count = lr_count;
 
345
 
 
346
        ASSERT(!claimed || !(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID) ||
 
347
            (max_blk_seq == claim_blk_seq && max_lr_seq == claim_lr_seq));
 
348
 
 
349
        zil_bp_tree_fini(zilog);
 
350
        zio_buf_free(lrbuf, SPA_MAXBLOCKSIZE);
 
351
 
 
352
        return (error);
278
353
}
279
354
 
280
 
/* ARGSUSED */
281
 
static void
 
355
static int
282
356
zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
283
357
{
284
 
        spa_t *spa = zilog->zl_spa;
285
 
        int err;
286
 
 
287
358
        /*
288
359
         * Claim log block if not already committed and not already claimed.
 
360
         * If tx == NULL, just verify that the block is claimable.
289
361
         */
290
 
        if (bp->blk_birth >= first_txg &&
291
 
            zil_dva_tree_add(&zilog->zl_dva_tree, BP_IDENTITY(bp)) == 0) {
292
 
                err = zio_wait(zio_claim(NULL, spa, first_txg, bp, NULL, NULL,
293
 
                    ZIO_FLAG_MUSTSUCCEED));
294
 
                ASSERT(err == 0);
295
 
        }
 
362
        if (bp->blk_birth < first_txg || zil_bp_tree_add(zilog, bp) != 0)
 
363
                return (0);
 
364
 
 
365
        return (zio_wait(zio_claim(NULL, zilog->zl_spa,
 
366
            tx == NULL ? 0 : first_txg, bp, spa_claim_notify, NULL,
 
367
            ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB)));
296
368
}
297
369
 
298
 
static void
 
370
static int
299
371
zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
300
372
{
301
 
        if (lrc->lrc_txtype == TX_WRITE) {
302
 
                lr_write_t *lr = (lr_write_t *)lrc;
303
 
                zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg);
304
 
        }
 
373
        lr_write_t *lr = (lr_write_t *)lrc;
 
374
        int error;
 
375
 
 
376
        if (lrc->lrc_txtype != TX_WRITE)
 
377
                return (0);
 
378
 
 
379
        /*
 
380
         * If the block is not readable, don't claim it.  This can happen
 
381
         * in normal operation when a log block is written to disk before
 
382
         * some of the dmu_sync() blocks it points to.  In this case, the
 
383
         * transaction cannot have been committed to anyone (we would have
 
384
         * waited for all writes to be stable first), so it is semantically
 
385
         * correct to declare this the end of the log.
 
386
         */
 
387
        if (lr->lr_blkptr.blk_birth >= first_txg &&
 
388
            (error = zil_read_log_data(zilog, lr, NULL)) != 0)
 
389
                return (error);
 
390
        return (zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg));
305
391
}
306
392
 
307
393
/* ARGSUSED */
308
 
static void
 
394
static int
309
395
zil_free_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t claim_txg)
310
396
{
311
 
        zio_free_blk(zilog->zl_spa, bp, dmu_tx_get_txg(tx));
 
397
        zio_free_zil(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
 
398
 
 
399
        return (0);
312
400
}
313
401
 
314
 
static void
 
402
static int
315
403
zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
316
404
{
 
405
        lr_write_t *lr = (lr_write_t *)lrc;
 
406
        blkptr_t *bp = &lr->lr_blkptr;
 
407
 
317
408
        /*
318
409
         * If we previously claimed it, we need to free it.
319
410
         */
320
 
        if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE) {
321
 
                lr_write_t *lr = (lr_write_t *)lrc;
322
 
                blkptr_t *bp = &lr->lr_blkptr;
323
 
                if (bp->blk_birth >= claim_txg &&
324
 
                    !zil_dva_tree_add(&zilog->zl_dva_tree, BP_IDENTITY(bp))) {
325
 
                        (void) arc_free(NULL, zilog->zl_spa,
326
 
                            dmu_tx_get_txg(tx), bp, NULL, NULL, ARC_WAIT);
327
 
                }
 
411
        if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE &&
 
412
            bp->blk_birth >= claim_txg && zil_bp_tree_add(zilog, bp) == 0)
 
413
                zio_free(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
 
414
 
 
415
        return (0);
 
416
}
 
417
 
 
418
static lwb_t *
 
419
zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, uint64_t txg)
 
420
{
 
421
        lwb_t *lwb;
 
422
 
 
423
        lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
 
424
        lwb->lwb_zilog = zilog;
 
425
        lwb->lwb_blk = *bp;
 
426
        lwb->lwb_buf = zio_buf_alloc(BP_GET_LSIZE(bp));
 
427
        lwb->lwb_max_txg = txg;
 
428
        lwb->lwb_zio = NULL;
 
429
        lwb->lwb_tx = NULL;
 
430
        if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
 
431
                lwb->lwb_nused = sizeof (zil_chain_t);
 
432
                lwb->lwb_sz = BP_GET_LSIZE(bp);
 
433
        } else {
 
434
                lwb->lwb_nused = 0;
 
435
                lwb->lwb_sz = BP_GET_LSIZE(bp) - sizeof (zil_chain_t);
328
436
        }
 
437
 
 
438
        mutex_enter(&zilog->zl_lock);
 
439
        list_insert_tail(&zilog->zl_lwb_list, lwb);
 
440
        mutex_exit(&zilog->zl_lock);
 
441
 
 
442
        return (lwb);
329
443
}
330
444
 
331
445
/*
332
446
 * Create an on-disk intent log.
333
447
 */
334
 
static void
 
448
static lwb_t *
335
449
zil_create(zilog_t *zilog)
336
450
{
337
451
        const zil_header_t *zh = zilog->zl_header;
338
 
        lwb_t *lwb;
 
452
        lwb_t *lwb = NULL;
339
453
        uint64_t txg = 0;
340
454
        dmu_tx_t *tx = NULL;
341
455
        blkptr_t blk;
352
466
        blk = zh->zh_log;
353
467
 
354
468
        /*
355
 
         * If we don't already have an initial log block or we have one
356
 
         * but it's the wrong endianness then allocate one.
 
469
         * Allocate an initial log block if:
 
470
         *    - there isn't one already
 
471
         *    - the existing block is the wrong endianess
357
472
         */
358
473
        if (BP_IS_HOLE(&blk) || BP_SHOULD_BYTESWAP(&blk)) {
359
474
                tx = dmu_tx_create(zilog->zl_os);
360
 
                (void) dmu_tx_assign(tx, TXG_WAIT);
 
475
                VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
361
476
                dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
362
477
                txg = dmu_tx_get_txg(tx);
363
478
 
364
479
                if (!BP_IS_HOLE(&blk)) {
365
 
                        zio_free_blk(zilog->zl_spa, &blk, txg);
 
480
                        zio_free_zil(zilog->zl_spa, txg, &blk);
366
481
                        BP_ZERO(&blk);
367
482
                }
368
483
 
369
 
                error = zio_alloc_blk(zilog->zl_spa, ZIL_MIN_BLKSZ, &blk,
370
 
                    NULL, txg);
 
484
                error = zio_alloc_zil(zilog->zl_spa, txg, &blk, NULL,
 
485
                    ZIL_MIN_BLKSZ, zilog->zl_logbias == ZFS_LOGBIAS_LATENCY);
371
486
 
372
487
                if (error == 0)
373
488
                        zil_init_log_chain(zilog, &blk);
376
491
        /*
377
492
         * Allocate a log write buffer (lwb) for the first log block.
378
493
         */
379
 
        if (error == 0) {
380
 
                lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
381
 
                lwb->lwb_zilog = zilog;
382
 
                lwb->lwb_blk = blk;
383
 
                lwb->lwb_nused = 0;
384
 
                lwb->lwb_sz = BP_GET_LSIZE(&lwb->lwb_blk);
385
 
                lwb->lwb_buf = zio_buf_alloc(lwb->lwb_sz);
386
 
                lwb->lwb_max_txg = txg;
387
 
                lwb->lwb_zio = NULL;
388
 
 
389
 
                mutex_enter(&zilog->zl_lock);
390
 
                list_insert_tail(&zilog->zl_lwb_list, lwb);
391
 
                mutex_exit(&zilog->zl_lock);
392
 
        }
 
494
        if (error == 0)
 
495
                lwb = zil_alloc_lwb(zilog, &blk, txg);
393
496
 
394
497
        /*
395
498
         * If we just allocated the first log block, commit our transaction
402
505
        }
403
506
 
404
507
        ASSERT(bcmp(&blk, &zh->zh_log, sizeof (blk)) == 0);
 
508
 
 
509
        return (lwb);
405
510
}
406
511
 
407
512
/*
426
531
         */
427
532
        txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
428
533
 
 
534
        zilog->zl_old_header = *zh;             /* debugging aid */
 
535
 
429
536
        if (BP_IS_HOLE(&zh->zh_log))
430
537
                return;
431
538
 
432
539
        tx = dmu_tx_create(zilog->zl_os);
433
 
        (void) dmu_tx_assign(tx, TXG_WAIT);
 
540
        VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
434
541
        dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
435
542
        txg = dmu_tx_get_txg(tx);
436
543
 
437
544
        mutex_enter(&zilog->zl_lock);
438
545
 
439
 
        /*
440
 
         * It is possible for the ZIL to get the previously mounted zilog
441
 
         * structure of the same dataset if quickly remounted and the dbuf
442
 
         * eviction has not completed. In this case we can see a non
443
 
         * empty lwb list and keep_first will be set. We fix this by
444
 
         * clearing the keep_first. This will be slower but it's very rare.
445
 
         */
446
 
        if (!list_is_empty(&zilog->zl_lwb_list) && keep_first)
447
 
                keep_first = B_FALSE;
448
 
 
449
546
        ASSERT3U(zilog->zl_destroy_txg, <, txg);
450
547
        zilog->zl_destroy_txg = txg;
451
548
        zilog->zl_keep_first = keep_first;
457
554
                        list_remove(&zilog->zl_lwb_list, lwb);
458
555
                        if (lwb->lwb_buf != NULL)
459
556
                                zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
460
 
                        zio_free_blk(zilog->zl_spa, &lwb->lwb_blk, txg);
 
557
                        zio_free_zil(zilog->zl_spa, txg, &lwb->lwb_blk);
461
558
                        kmem_cache_free(zil_lwb_cache, lwb);
462
559
                }
463
 
        } else {
464
 
                if (!keep_first) {
465
 
                        (void) zil_parse(zilog, zil_free_log_block,
466
 
                            zil_free_log_record, tx, zh->zh_claim_txg);
467
 
                }
 
560
        } else if (!keep_first) {
 
561
                (void) zil_parse(zilog, zil_free_log_block,
 
562
                    zil_free_log_record, tx, zh->zh_claim_txg);
468
563
        }
469
564
        mutex_exit(&zilog->zl_lock);
470
565
 
471
566
        dmu_tx_commit(tx);
472
567
}
473
568
 
474
 
/*
475
 
 * return true if the initial log block is not valid
476
 
 */
477
 
static boolean_t
478
 
zil_empty(zilog_t *zilog)
479
 
{
480
 
        const zil_header_t *zh = zilog->zl_header;
481
 
        arc_buf_t *abuf = NULL;
482
 
 
483
 
        if (BP_IS_HOLE(&zh->zh_log))
484
 
                return (B_TRUE);
485
 
 
486
 
        if (zil_read_log_block(zilog, &zh->zh_log, &abuf) != 0)
487
 
                return (B_TRUE);
488
 
 
489
 
        VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
490
 
        return (B_FALSE);
491
 
}
492
 
 
493
569
int
494
 
zil_claim(char *osname, void *txarg)
 
570
zil_claim(const char *osname, void *txarg)
495
571
{
496
572
        dmu_tx_t *tx = txarg;
497
573
        uint64_t first_txg = dmu_tx_get_txg(tx);
500
576
        objset_t *os;
501
577
        int error;
502
578
 
503
 
        error = dmu_objset_open(osname, DMU_OST_ANY, DS_MODE_USER, &os);
 
579
        error = dmu_objset_hold(osname, FTAG, &os);
504
580
        if (error) {
505
581
                cmn_err(CE_WARN, "can't open objset for %s", osname);
506
582
                return (0);
509
585
        zilog = dmu_objset_zil(os);
510
586
        zh = zil_header_in_syncing_context(zilog);
511
587
 
512
 
        if (zilog->zl_spa->spa_log_state == SPA_LOG_CLEAR) {
 
588
        if (spa_get_log_state(zilog->zl_spa) == SPA_LOG_CLEAR) {
513
589
                if (!BP_IS_HOLE(&zh->zh_log))
514
 
                        zio_free_blk(zilog->zl_spa, &zh->zh_log, first_txg);
 
590
                        zio_free_zil(zilog->zl_spa, first_txg, &zh->zh_log);
515
591
                BP_ZERO(&zh->zh_log);
516
592
                dsl_dataset_dirty(dmu_objset_ds(os), tx);
517
 
        }
518
 
 
519
 
        /*
520
 
         * Record here whether the zil has any records to replay.
521
 
         * If the header block pointer is null or the block points
522
 
         * to the stubby then we know there are no valid log records.
523
 
         * We use the header to store this state as the the zilog gets
524
 
         * freed later in dmu_objset_close().
525
 
         * The flags (and the rest of the header fields) are cleared in
526
 
         * zil_sync() as a result of a zil_destroy(), after replaying the log.
527
 
         *
528
 
         * Note, the intent log can be empty but still need the
529
 
         * stubby to be claimed.
530
 
         */
531
 
        if (!zil_empty(zilog)) {
532
 
                zh->zh_flags |= ZIL_REPLAY_NEEDED;
533
 
                dsl_dataset_dirty(dmu_objset_ds(os), tx);
 
593
                dmu_objset_rele(os, FTAG);
 
594
                return (0);
534
595
        }
535
596
 
536
597
        /*
542
603
         */
543
604
        ASSERT3U(zh->zh_claim_txg, <=, first_txg);
544
605
        if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
 
606
                (void) zil_parse(zilog, zil_claim_log_block,
 
607
                    zil_claim_log_record, tx, first_txg);
545
608
                zh->zh_claim_txg = first_txg;
546
 
                zh->zh_claim_seq = zil_parse(zilog, zil_claim_log_block,
547
 
                    zil_claim_log_record, tx, first_txg);
 
609
                zh->zh_claim_blk_seq = zilog->zl_parse_blk_seq;
 
610
                zh->zh_claim_lr_seq = zilog->zl_parse_lr_seq;
 
611
                if (zilog->zl_parse_lr_count || zilog->zl_parse_blk_count > 1)
 
612
                        zh->zh_flags |= ZIL_REPLAY_NEEDED;
 
613
                zh->zh_flags |= ZIL_CLAIM_LR_SEQ_VALID;
548
614
                dsl_dataset_dirty(dmu_objset_ds(os), tx);
549
615
        }
550
616
 
551
617
        ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
552
 
        dmu_objset_close(os);
 
618
        dmu_objset_rele(os, FTAG);
553
619
        return (0);
554
620
}
555
621
 
558
624
 * Checksum errors are ok as they indicate the end of the chain.
559
625
 * Any other error (no device or read failure) returns an error.
560
626
 */
561
 
/* ARGSUSED */
562
627
int
563
 
zil_check_log_chain(char *osname, void *txarg)
 
628
zil_check_log_chain(const char *osname, void *tx)
564
629
{
565
630
        zilog_t *zilog;
566
 
        zil_header_t *zh;
567
 
        blkptr_t blk;
568
 
        arc_buf_t *abuf;
569
631
        objset_t *os;
570
 
        char *lrbuf;
571
 
        zil_trailer_t *ztp;
572
632
        int error;
573
633
 
574
 
        error = dmu_objset_open(osname, DMU_OST_ANY, DS_MODE_USER, &os);
 
634
        ASSERT(tx == NULL);
 
635
 
 
636
        error = dmu_objset_hold(osname, FTAG, &os);
575
637
        if (error) {
576
638
                cmn_err(CE_WARN, "can't open objset for %s", osname);
577
639
                return (0);
578
640
        }
579
641
 
580
642
        zilog = dmu_objset_zil(os);
581
 
        zh = zil_header_in_syncing_context(zilog);
582
 
        blk = zh->zh_log;
583
 
        if (BP_IS_HOLE(&blk)) {
584
 
                dmu_objset_close(os);
585
 
                return (0); /* no chain */
586
 
        }
587
 
 
588
 
        for (;;) {
589
 
                error = zil_read_log_block(zilog, &blk, &abuf);
590
 
                if (error)
591
 
                        break;
592
 
                lrbuf = abuf->b_data;
593
 
                ztp = (zil_trailer_t *)(lrbuf + BP_GET_LSIZE(&blk)) - 1;
594
 
                blk = ztp->zit_next_blk;
595
 
                VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
596
 
        }
597
 
        dmu_objset_close(os);
598
 
        if (error == ECKSUM)
599
 
                return (0); /* normal end of chain */
600
 
        return (error);
 
643
 
 
644
        /*
 
645
         * Because tx == NULL, zil_claim_log_block() will not actually claim
 
646
         * any blocks, but just determine whether it is possible to do so.
 
647
         * In addition to checking the log chain, zil_claim_log_block()
 
648
         * will invoke zio_claim() with a done func of spa_claim_notify(),
 
649
         * which will update spa_max_claim_txg.  See spa_load() for details.
 
650
         */
 
651
        error = zil_parse(zilog, zil_claim_log_block, zil_claim_log_record, tx,
 
652
            zilog->zl_header->zh_claim_txg ? -1ULL : spa_first_txg(os->os_spa));
 
653
 
 
654
        dmu_objset_rele(os, FTAG);
 
655
 
 
656
        return ((error == ECKSUM || error == ENOENT) ? 0 : error);
601
657
}
602
658
 
603
659
static int
615
671
}
616
672
 
617
673
void
618
 
zil_add_block(zilog_t *zilog, blkptr_t *bp)
 
674
zil_add_block(zilog_t *zilog, const blkptr_t *bp)
619
675
{
620
676
        avl_tree_t *t = &zilog->zl_vdev_tree;
621
677
        avl_index_t where;
691
747
{
692
748
        lwb_t *lwb = zio->io_private;
693
749
        zilog_t *zilog = lwb->lwb_zilog;
 
750
        dmu_tx_t *tx = lwb->lwb_tx;
694
751
 
695
752
        ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
696
 
        ASSERT(BP_GET_CHECKSUM(zio->io_bp) == ZIO_CHECKSUM_ZILOG);
697
753
        ASSERT(BP_GET_TYPE(zio->io_bp) == DMU_OT_INTENT_LOG);
698
754
        ASSERT(BP_GET_LEVEL(zio->io_bp) == 0);
699
755
        ASSERT(BP_GET_BYTEORDER(zio->io_bp) == ZFS_HOST_BYTEORDER);
712
768
        zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
713
769
        mutex_enter(&zilog->zl_lock);
714
770
        lwb->lwb_buf = NULL;
715
 
        if (zio->io_error)
716
 
                zilog->zl_log_error = B_TRUE;
 
771
        lwb->lwb_tx = NULL;
717
772
        mutex_exit(&zilog->zl_lock);
718
773
 
719
774
        /*
721
776
         * to the next block in the chain, so it's OK to let the txg in
722
777
         * which we allocated the next block sync.
723
778
         */
724
 
        txg_rele_to_sync(&lwb->lwb_txgh);
 
779
        dmu_tx_commit(tx);
725
780
}
726
781
 
727
782
/*
732
787
{
733
788
        zbookmark_t zb;
734
789
 
735
 
        zb.zb_objset = lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET];
736
 
        zb.zb_object = 0;
737
 
        zb.zb_level = -1;
738
 
        zb.zb_blkid = lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
 
790
        SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
 
791
            ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
 
792
            lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ]);
739
793
 
740
794
        if (zilog->zl_root_zio == NULL) {
741
795
                zilog->zl_root_zio = zio_root(zilog->zl_spa, NULL, NULL,
743
797
        }
744
798
        if (lwb->lwb_zio == NULL) {
745
799
                lwb->lwb_zio = zio_rewrite(zilog->zl_root_zio, zilog->zl_spa,
746
 
                    0, &lwb->lwb_blk, lwb->lwb_buf, lwb->lwb_sz,
 
800
                    0, &lwb->lwb_blk, lwb->lwb_buf, BP_GET_LSIZE(&lwb->lwb_blk),
747
801
                    zil_lwb_write_done, lwb, ZIO_PRIORITY_LOG_WRITE,
748
 
                    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE, &zb);
 
802
                    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE, &zb);
749
803
        }
750
804
}
751
805
 
752
806
/*
 
807
 * Define a limited set of intent log block sizes.
 
808
 * These must be a multiple of 4KB. Note only the amount used (again
 
809
 * aligned to 4KB) actually gets written. However, we can't always just
 
810
 * allocate SPA_MAXBLOCKSIZE as the slog space could be exhausted.
 
811
 */
 
812
uint64_t zil_block_buckets[] = {
 
813
    4096,               /* non TX_WRITE */
 
814
    8192+4096,          /* data base */
 
815
    32*1024 + 4096,     /* NFS writes */
 
816
    UINT64_MAX
 
817
};
 
818
 
 
819
/*
 
820
 * Use the slog as long as the logbias is 'latency' and the current commit size
 
821
 * is less than the limit or the total list size is less than 2X the limit.
 
822
 * Limit checking is disabled by setting zil_slog_limit to UINT64_MAX.
 
823
 */
 
824
uint64_t zil_slog_limit = 1024 * 1024;
 
825
#define USE_SLOG(zilog) (((zilog)->zl_logbias == ZFS_LOGBIAS_LATENCY) && \
 
826
        (((zilog)->zl_cur_used < zil_slog_limit) || \
 
827
        ((zilog)->zl_itx_list_sz < (zil_slog_limit << 1))))
 
828
 
 
829
/*
753
830
 * Start a log block write and advance to the next log block.
754
831
 * Calls are serialized.
755
832
 */
756
833
static lwb_t *
757
834
zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
758
835
{
759
 
        lwb_t *nlwb;
760
 
        zil_trailer_t *ztp = (zil_trailer_t *)(lwb->lwb_buf + lwb->lwb_sz) - 1;
 
836
        lwb_t *nlwb = NULL;
 
837
        zil_chain_t *zilc;
761
838
        spa_t *spa = zilog->zl_spa;
762
 
        blkptr_t *bp = &ztp->zit_next_blk;
 
839
        blkptr_t *bp;
 
840
        dmu_tx_t *tx;
763
841
        uint64_t txg;
764
 
        uint64_t zil_blksz;
765
 
        int error;
766
 
 
767
 
        ASSERT(lwb->lwb_nused <= ZIL_BLK_DATA_SZ(lwb));
 
842
        uint64_t zil_blksz, wsz;
 
843
        int i, error;
 
844
 
 
845
        if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
 
846
                zilc = (zil_chain_t *)lwb->lwb_buf;
 
847
                bp = &zilc->zc_next_blk;
 
848
        } else {
 
849
                zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz);
 
850
                bp = &zilc->zc_next_blk;
 
851
        }
 
852
 
 
853
        ASSERT(lwb->lwb_nused <= lwb->lwb_sz);
768
854
 
769
855
        /*
770
856
         * Allocate the next block and save its address in this block
771
857
         * before writing it in order to establish the log chain.
772
858
         * Note that if the allocation of nlwb synced before we wrote
773
859
         * the block that points at it (lwb), we'd leak it if we crashed.
774
 
         * Therefore, we don't do txg_rele_to_sync() until zil_lwb_write_done().
 
860
         * Therefore, we don't do dmu_tx_commit() until zil_lwb_write_done().
 
861
         * We dirty the dataset to ensure that zil_sync() will be called
 
862
         * to clean up in the event of allocation failure or I/O failure.
775
863
         */
776
 
        txg = txg_hold_open(zilog->zl_dmu_pool, &lwb->lwb_txgh);
777
 
        txg_rele_to_quiesce(&lwb->lwb_txgh);
 
864
        tx = dmu_tx_create(zilog->zl_os);
 
865
        VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
 
866
        dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
 
867
        txg = dmu_tx_get_txg(tx);
 
868
 
 
869
        lwb->lwb_tx = tx;
778
870
 
779
871
        /*
780
 
         * Pick a ZIL blocksize. We request a size that is the
781
 
         * maximum of the previous used size, the current used size and
782
 
         * the amount waiting in the queue.
 
872
         * Log blocks are pre-allocated. Here we select the size of the next
 
873
         * block, based on size used in the last block.
 
874
         * - first find the smallest bucket that will fit the block from a
 
875
         *   limited set of block sizes. This is because it's faster to write
 
876
         *   blocks allocated from the same metaslab as they are adjacent or
 
877
         *   close.
 
878
         * - next find the maximum from the new suggested size and an array of
 
879
         *   previous sizes. This lessens a picket fence effect of wrongly
 
880
         *   guesssing the size if we have a stream of say 2k, 64k, 2k, 64k
 
881
         *   requests.
 
882
         *
 
883
         * Note we only write what is used, but we can't just allocate
 
884
         * the maximum block size because we can exhaust the available
 
885
         * pool log space.
783
886
         */
784
 
        zil_blksz = MAX(zilog->zl_prev_used,
785
 
            zilog->zl_cur_used + sizeof (*ztp));
786
 
        zil_blksz = MAX(zil_blksz, zilog->zl_itx_list_sz + sizeof (*ztp));
787
 
        zil_blksz = P2ROUNDUP_TYPED(zil_blksz, ZIL_MIN_BLKSZ, uint64_t);
788
 
        if (zil_blksz > ZIL_MAX_BLKSZ)
789
 
                zil_blksz = ZIL_MAX_BLKSZ;
 
887
        zil_blksz = zilog->zl_cur_used + sizeof (zil_chain_t);
 
888
        for (i = 0; zil_blksz > zil_block_buckets[i]; i++)
 
889
                continue;
 
890
        zil_blksz = zil_block_buckets[i];
 
891
        if (zil_blksz == UINT64_MAX)
 
892
                zil_blksz = SPA_MAXBLOCKSIZE;
 
893
        zilog->zl_prev_blks[zilog->zl_prev_rotor] = zil_blksz;
 
894
        for (i = 0; i < ZIL_PREV_BLKS; i++)
 
895
                zil_blksz = MAX(zil_blksz, zilog->zl_prev_blks[i]);
 
896
        zilog->zl_prev_rotor = (zilog->zl_prev_rotor + 1) & (ZIL_PREV_BLKS - 1);
790
897
 
791
898
        BP_ZERO(bp);
792
899
        /* pass the old blkptr in order to spread log blocks across devs */
793
 
        error = zio_alloc_blk(spa, zil_blksz, bp, &lwb->lwb_blk, txg);
794
 
        if (error) {
795
 
                dmu_tx_t *tx = dmu_tx_create_assigned(zilog->zl_dmu_pool, txg);
796
 
 
797
 
                /*
798
 
                 * We dirty the dataset to ensure that zil_sync() will
799
 
                 * be called to remove this lwb from our zl_lwb_list.
800
 
                 * Failing to do so, may leave an lwb with a NULL lwb_buf
801
 
                 * hanging around on the zl_lwb_list.
802
 
                 */
803
 
                dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
804
 
                dmu_tx_commit(tx);
805
 
 
806
 
                /*
807
 
                 * Since we've just experienced an allocation failure so we
808
 
                 * terminate the current lwb and send it on its way.
809
 
                 */
810
 
                ztp->zit_pad = 0;
811
 
                ztp->zit_nused = lwb->lwb_nused;
812
 
                ztp->zit_bt.zbt_cksum = lwb->lwb_blk.blk_cksum;
813
 
                zio_nowait(lwb->lwb_zio);
814
 
 
815
 
                /*
816
 
                 * By returning NULL the caller will call tx_wait_synced()
817
 
                 */
818
 
                return (NULL);
819
 
        }
820
 
 
821
 
        ASSERT3U(bp->blk_birth, ==, txg);
822
 
        ztp->zit_pad = 0;
823
 
        ztp->zit_nused = lwb->lwb_nused;
824
 
        ztp->zit_bt.zbt_cksum = lwb->lwb_blk.blk_cksum;
825
 
        bp->blk_cksum = lwb->lwb_blk.blk_cksum;
826
 
        bp->blk_cksum.zc_word[ZIL_ZC_SEQ]++;
827
 
 
828
 
        /*
829
 
         * Allocate a new log write buffer (lwb).
830
 
         */
831
 
        nlwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
832
 
 
833
 
        nlwb->lwb_zilog = zilog;
834
 
        nlwb->lwb_blk = *bp;
835
 
        nlwb->lwb_nused = 0;
836
 
        nlwb->lwb_sz = BP_GET_LSIZE(&nlwb->lwb_blk);
837
 
        nlwb->lwb_buf = zio_buf_alloc(nlwb->lwb_sz);
838
 
        nlwb->lwb_max_txg = txg;
839
 
        nlwb->lwb_zio = NULL;
840
 
 
841
 
        /*
842
 
         * Put new lwb at the end of the log chain
843
 
         */
844
 
        mutex_enter(&zilog->zl_lock);
845
 
        list_insert_tail(&zilog->zl_lwb_list, nlwb);
846
 
        mutex_exit(&zilog->zl_lock);
847
 
 
848
 
        /* Record the block for later vdev flushing */
849
 
        zil_add_block(zilog, &lwb->lwb_blk);
850
 
 
851
 
        /*
852
 
         * kick off the write for the old log block
853
 
         */
854
 
        dprintf_bp(&lwb->lwb_blk, "lwb %p txg %llu: ", lwb, txg);
855
 
        ASSERT(lwb->lwb_zio);
856
 
        zio_nowait(lwb->lwb_zio);
857
 
 
 
900
        error = zio_alloc_zil(spa, txg, bp, &lwb->lwb_blk, zil_blksz,
 
901
            USE_SLOG(zilog));
 
902
        if (!error) {
 
903
                ASSERT3U(bp->blk_birth, ==, txg);
 
904
                bp->blk_cksum = lwb->lwb_blk.blk_cksum;
 
905
                bp->blk_cksum.zc_word[ZIL_ZC_SEQ]++;
 
906
 
 
907
                /*
 
908
                 * Allocate a new log write buffer (lwb).
 
909
                 */
 
910
                nlwb = zil_alloc_lwb(zilog, bp, txg);
 
911
 
 
912
                /* Record the block for later vdev flushing */
 
913
                zil_add_block(zilog, &lwb->lwb_blk);
 
914
        }
 
915
 
 
916
        if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
 
917
                /* For Slim ZIL only write what is used. */
 
918
                wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ, uint64_t);
 
919
                ASSERT3U(wsz, <=, lwb->lwb_sz);
 
920
                zio_shrink(lwb->lwb_zio, wsz);
 
921
 
 
922
        } else {
 
923
                wsz = lwb->lwb_sz;
 
924
        }
 
925
 
 
926
        zilc->zc_pad = 0;
 
927
        zilc->zc_nused = lwb->lwb_nused;
 
928
        zilc->zc_eck.zec_cksum = lwb->lwb_blk.blk_cksum;
 
929
 
 
930
        /*
 
931
         * clear unused data for security
 
932
         */
 
933
        bzero(lwb->lwb_buf + lwb->lwb_nused, wsz - lwb->lwb_nused);
 
934
 
 
935
        zio_nowait(lwb->lwb_zio); /* Kick off the write for the old log block */
 
936
 
 
937
        /*
 
938
         * If there was an allocation failure then nlwb will be null which
 
939
         * forces a txg_wait_synced().
 
940
         */
858
941
        return (nlwb);
859
942
}
860
943
 
862
945
zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
863
946
{
864
947
        lr_t *lrc = &itx->itx_lr; /* common log record */
865
 
        lr_write_t *lr = (lr_write_t *)lrc;
 
948
        lr_write_t *lrw = (lr_write_t *)lrc;
 
949
        char *lr_buf;
866
950
        uint64_t txg = lrc->lrc_txg;
867
951
        uint64_t reclen = lrc->lrc_reclen;
868
 
        uint64_t dlen;
 
952
        uint64_t dlen = 0;
869
953
 
870
954
        if (lwb == NULL)
871
955
                return (NULL);
 
956
 
872
957
        ASSERT(lwb->lwb_buf != NULL);
873
958
 
874
959
        if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY)
875
960
                dlen = P2ROUNDUP_TYPED(
876
 
                    lr->lr_length, sizeof (uint64_t), uint64_t);
877
 
        else
878
 
                dlen = 0;
 
961
                    lrw->lr_length, sizeof (uint64_t), uint64_t);
879
962
 
880
963
        zilog->zl_cur_used += (reclen + dlen);
881
964
 
884
967
        /*
885
968
         * If this record won't fit in the current log block, start a new one.
886
969
         */
887
 
        if (lwb->lwb_nused + reclen + dlen > ZIL_BLK_DATA_SZ(lwb)) {
 
970
        if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
888
971
                lwb = zil_lwb_write_start(zilog, lwb);
889
972
                if (lwb == NULL)
890
973
                        return (NULL);
891
974
                zil_lwb_write_init(zilog, lwb);
892
 
                ASSERT(lwb->lwb_nused == 0);
893
 
                if (reclen + dlen > ZIL_BLK_DATA_SZ(lwb)) {
 
975
                ASSERT(LWB_EMPTY(lwb));
 
976
                if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
894
977
                        txg_wait_synced(zilog->zl_dmu_pool, txg);
895
978
                        return (lwb);
896
979
                }
897
980
        }
898
981
 
899
 
        /*
900
 
         * Update the lrc_seq, to be log record sequence number. See zil.h
901
 
         * Then copy the record to the log buffer.
902
 
         */
903
 
        lrc->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
904
 
        bcopy(lrc, lwb->lwb_buf + lwb->lwb_nused, reclen);
 
982
        lr_buf = lwb->lwb_buf + lwb->lwb_nused;
 
983
        bcopy(lrc, lr_buf, reclen);
 
984
        lrc = (lr_t *)lr_buf;
 
985
        lrw = (lr_write_t *)lrc;
905
986
 
906
987
        /*
907
988
         * If it's a write, fetch the data or get its blkptr as appropriate.
913
994
                        char *dbuf;
914
995
                        int error;
915
996
 
916
 
                        /* alignment is guaranteed */
917
 
                        lr = (lr_write_t *)(lwb->lwb_buf + lwb->lwb_nused);
918
997
                        if (dlen) {
919
998
                                ASSERT(itx->itx_wr_state == WR_NEED_COPY);
920
 
                                dbuf = lwb->lwb_buf + lwb->lwb_nused + reclen;
921
 
                                lr->lr_common.lrc_reclen += dlen;
 
999
                                dbuf = lr_buf + reclen;
 
1000
                                lrw->lr_common.lrc_reclen += dlen;
922
1001
                        } else {
923
1002
                                ASSERT(itx->itx_wr_state == WR_INDIRECT);
924
1003
                                dbuf = NULL;
925
1004
                        }
926
1005
                        error = zilog->zl_get_data(
927
 
                            itx->itx_private, lr, dbuf, lwb->lwb_zio);
 
1006
                            itx->itx_private, lrw, dbuf, lwb->lwb_zio);
 
1007
                        if (error == EIO) {
 
1008
                                txg_wait_synced(zilog->zl_dmu_pool, txg);
 
1009
                                return (lwb);
 
1010
                        }
928
1011
                        if (error) {
929
1012
                                ASSERT(error == ENOENT || error == EEXIST ||
930
1013
                                    error == EALREADY);
933
1016
                }
934
1017
        }
935
1018
 
 
1019
        /*
 
1020
         * We're actually making an entry, so update lrc_seq to be the
 
1021
         * log record sequence number.  Note that this is generally not
 
1022
         * equal to the itx sequence number because not all transactions
 
1023
         * are synchronous, and sometimes spa_sync() gets there first.
 
1024
         */
 
1025
        lrc->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
936
1026
        lwb->lwb_nused += reclen + dlen;
937
1027
        lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
938
 
        ASSERT3U(lwb->lwb_nused, <=, ZIL_BLK_DATA_SZ(lwb));
 
1028
        ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz);
939
1029
        ASSERT3U(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)), ==, 0);
940
1030
 
941
1031
        return (lwb);
957
1047
        return (itx);
958
1048
}
959
1049
 
 
1050
void
 
1051
zil_itx_destroy(itx_t *itx)
 
1052
{
 
1053
        kmem_free(itx, offsetof(itx_t, itx_lr) + itx->itx_lr.lrc_reclen);
 
1054
}
 
1055
 
960
1056
uint64_t
961
1057
zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
962
1058
{
963
1059
        uint64_t seq;
964
1060
 
965
1061
        ASSERT(itx->itx_lr.lrc_seq == 0);
 
1062
        ASSERT(!zilog->zl_replay);
966
1063
 
967
1064
        mutex_enter(&zilog->zl_lock);
968
1065
        list_insert_tail(&zilog->zl_itx_list, itx);
1011
1108
        /* destroy sync'd log transactions */
1012
1109
        while ((itx = list_head(&clean_list)) != NULL) {
1013
1110
                list_remove(&clean_list, itx);
1014
 
                kmem_free(itx, offsetof(itx_t, itx_lr)
1015
 
                    + itx->itx_lr.lrc_reclen);
 
1111
                zil_itx_destroy(itx);
1016
1112
        }
1017
1113
        list_destroy(&clean_list);
1018
1114
}
1031
1127
        if ((itx != NULL) &&
1032
1128
            (itx->itx_lr.lrc_txg <= spa_last_synced_txg(zilog->zl_spa))) {
1033
1129
                (void) taskq_dispatch(zilog->zl_clean_taskq,
1034
 
                    (task_func_t *)zil_itx_clean, zilog, TQ_SLEEP);
 
1130
                    (task_func_t *)zil_itx_clean, zilog, TQ_NOSLEEP);
1035
1131
        }
1036
1132
        mutex_exit(&zilog->zl_lock);
1037
1133
}
1041
1137
{
1042
1138
        uint64_t txg;
1043
1139
        uint64_t commit_seq = 0;
1044
 
        itx_t *itx, *itx_next = (itx_t *)-1;
 
1140
        itx_t *itx, *itx_next;
1045
1141
        lwb_t *lwb;
1046
1142
        spa_t *spa;
 
1143
        int error = 0;
1047
1144
 
1048
1145
        zilog->zl_writer = B_TRUE;
1049
1146
        ASSERT(zilog->zl_root_zio == NULL);
1063
1160
                                return;
1064
1161
                        }
1065
1162
                        mutex_exit(&zilog->zl_lock);
1066
 
                        zil_create(zilog);
 
1163
                        lwb = zil_create(zilog);
1067
1164
                        mutex_enter(&zilog->zl_lock);
1068
 
                        lwb = list_tail(&zilog->zl_lwb_list);
1069
1165
                }
1070
1166
        }
 
1167
        ASSERT(lwb == NULL || lwb->lwb_zio == NULL);
1071
1168
 
1072
1169
        /* Loop through in-memory log transactions filling log blocks. */
1073
1170
        DTRACE_PROBE1(zil__cw1, zilog_t *, zilog);
1074
 
        for (;;) {
1075
 
                /*
1076
 
                 * Find the next itx to push:
1077
 
                 * Push all transactions related to specified foid and all
1078
 
                 * other transactions except TX_WRITE, TX_TRUNCATE,
1079
 
                 * TX_SETATTR and TX_ACL for all other files.
1080
 
                 */
1081
 
                if (itx_next != (itx_t *)-1)
1082
 
                        itx = itx_next;
1083
 
                else
1084
 
                        itx = list_head(&zilog->zl_itx_list);
1085
 
                for (; itx != NULL; itx = list_next(&zilog->zl_itx_list, itx)) {
1086
 
                        if (foid == 0) /* push all foids? */
1087
 
                                break;
1088
 
                        if (itx->itx_sync) /* push all O_[D]SYNC */
1089
 
                                break;
1090
 
                        switch (itx->itx_lr.lrc_txtype) {
1091
 
                        case TX_SETATTR:
1092
 
                        case TX_WRITE:
1093
 
                        case TX_TRUNCATE:
1094
 
                        case TX_ACL:
1095
 
                                /* lr_foid is same offset for these records */
1096
 
                                if (((lr_write_t *)&itx->itx_lr)->lr_foid
1097
 
                                    != foid) {
1098
 
                                        continue; /* skip this record */
1099
 
                                }
1100
 
                        }
1101
 
                        break;
1102
 
                }
1103
 
                if (itx == NULL)
1104
 
                        break;
 
1171
 
 
1172
        for (itx = list_head(&zilog->zl_itx_list); itx; itx = itx_next) {
 
1173
                /*
 
1174
                 * Save the next pointer.  Even though we drop zl_lock below,
 
1175
                 * all threads that can remove itx list entries (other writers
 
1176
                 * and zil_itx_clean()) can't do so until they have zl_writer.
 
1177
                 */
 
1178
                itx_next = list_next(&zilog->zl_itx_list, itx);
 
1179
 
 
1180
                /*
 
1181
                 * Determine whether to push this itx.
 
1182
                 * Push all transactions related to specified foid and
 
1183
                 * all other transactions except those that can be logged
 
1184
                 * out of order (TX_WRITE, TX_TRUNCATE, TX_SETATTR, TX_ACL)
 
1185
                 * for all other files.
 
1186
                 *
 
1187
                 * If foid == 0 (meaning "push all foids") or
 
1188
                 * itx->itx_sync is set (meaning O_[D]SYNC), push regardless.
 
1189
                 */
 
1190
                if (foid != 0 && !itx->itx_sync &&
 
1191
                    TX_OOO(itx->itx_lr.lrc_txtype) &&
 
1192
                    ((lr_ooo_t *)&itx->itx_lr)->lr_foid != foid)
 
1193
                        continue; /* skip this record */
1105
1194
 
1106
1195
                if ((itx->itx_lr.lrc_seq > seq) &&
1107
 
                    ((lwb == NULL) || (lwb->lwb_nused == 0) ||
1108
 
                    (lwb->lwb_nused + itx->itx_sod > ZIL_BLK_DATA_SZ(lwb)))) {
 
1196
                    ((lwb == NULL) || (LWB_EMPTY(lwb)) ||
 
1197
                    (lwb->lwb_nused + itx->itx_sod > lwb->lwb_sz)))
1109
1198
                        break;
1110
 
                }
1111
1199
 
1112
 
                /*
1113
 
                 * Save the next pointer.  Even though we soon drop
1114
 
                 * zl_lock all threads that may change the list
1115
 
                 * (another writer or zil_itx_clean) can't do so until
1116
 
                 * they have zl_writer.
1117
 
                 */
1118
 
                itx_next = list_next(&zilog->zl_itx_list, itx);
1119
1200
                list_remove(&zilog->zl_itx_list, itx);
1120
1201
                zilog->zl_itx_list_sz -= itx->itx_sod;
 
1202
 
1121
1203
                mutex_exit(&zilog->zl_lock);
 
1204
 
1122
1205
                txg = itx->itx_lr.lrc_txg;
1123
1206
                ASSERT(txg);
1124
1207
 
1125
1208
                if (txg > spa_last_synced_txg(spa) ||
1126
1209
                    txg > spa_freeze_txg(spa))
1127
1210
                        lwb = zil_lwb_commit(zilog, itx, lwb);
1128
 
                kmem_free(itx, offsetof(itx_t, itx_lr)
1129
 
                    + itx->itx_lr.lrc_reclen);
 
1211
 
 
1212
                zil_itx_destroy(itx);
 
1213
 
1130
1214
                mutex_enter(&zilog->zl_lock);
1131
1215
        }
1132
1216
        DTRACE_PROBE1(zil__cw2, zilog_t *, zilog);
1133
1217
        /* determine commit sequence number */
1134
1218
        itx = list_head(&zilog->zl_itx_list);
1135
1219
        if (itx)
1136
 
                commit_seq = itx->itx_lr.lrc_seq;
 
1220
                commit_seq = itx->itx_lr.lrc_seq - 1;
1137
1221
        else
1138
1222
                commit_seq = zilog->zl_itx_seq;
1139
1223
        mutex_exit(&zilog->zl_lock);
1150
1234
         */
1151
1235
        if (zilog->zl_root_zio) {
1152
1236
                DTRACE_PROBE1(zil__cw3, zilog_t *, zilog);
1153
 
                (void) zio_wait(zilog->zl_root_zio);
 
1237
                error = zio_wait(zilog->zl_root_zio);
1154
1238
                zilog->zl_root_zio = NULL;
1155
1239
                DTRACE_PROBE1(zil__cw4, zilog_t *, zilog);
1156
1240
                zil_flush_vdevs(zilog);
1157
1241
        }
1158
1242
 
1159
 
        if (zilog->zl_log_error || lwb == NULL) {
1160
 
                zilog->zl_log_error = 0;
 
1243
        if (error || lwb == NULL)
1161
1244
                txg_wait_synced(zilog->zl_dmu_pool, 0);
1162
 
        }
1163
1245
 
1164
1246
        mutex_enter(&zilog->zl_lock);
1165
1247
        zilog->zl_writer = B_FALSE;
1166
1248
 
1167
1249
        ASSERT3U(commit_seq, >=, zilog->zl_commit_seq);
1168
1250
        zilog->zl_commit_seq = commit_seq;
 
1251
 
 
1252
        /*
 
1253
         * Remember the highest committed log sequence number for ztest.
 
1254
         * We only update this value when all the log writes succeeded,
 
1255
         * because ztest wants to ASSERT that it got the whole log chain.
 
1256
         */
 
1257
        if (error == 0 && lwb != NULL)
 
1258
                zilog->zl_commit_lr_seq = zilog->zl_lr_seq;
1169
1259
}
1170
1260
 
1171
1261
/*
1185
1275
 
1186
1276
        while (zilog->zl_writer) {
1187
1277
                cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
1188
 
                if (seq < zilog->zl_commit_seq) {
 
1278
                if (seq <= zilog->zl_commit_seq) {
1189
1279
                        mutex_exit(&zilog->zl_lock);
1190
1280
                        return;
1191
1281
                }
1197
1287
}
1198
1288
 
1199
1289
/*
 
1290
 * Report whether all transactions are committed.
 
1291
 */
 
1292
static boolean_t
 
1293
zil_is_committed(zilog_t *zilog)
 
1294
{
 
1295
        lwb_t *lwb;
 
1296
        boolean_t committed;
 
1297
 
 
1298
        mutex_enter(&zilog->zl_lock);
 
1299
 
 
1300
        while (zilog->zl_writer)
 
1301
                cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
 
1302
 
 
1303
        if (!list_is_empty(&zilog->zl_itx_list))
 
1304
                committed = B_FALSE;            /* unpushed transactions */
 
1305
        else if ((lwb = list_head(&zilog->zl_lwb_list)) == NULL)
 
1306
                committed = B_TRUE;             /* intent log never used */
 
1307
        else if (list_next(&zilog->zl_lwb_list, lwb) != NULL)
 
1308
                committed = B_FALSE;            /* zil_sync() not done yet */
 
1309
        else
 
1310
                committed = B_TRUE;             /* everything synced */
 
1311
 
 
1312
        mutex_exit(&zilog->zl_lock);
 
1313
        return (committed);
 
1314
}
 
1315
 
 
1316
/*
1200
1317
 * Called in syncing context to free committed log blocks and update log header.
1201
1318
 */
1202
1319
void
1205
1322
        zil_header_t *zh = zil_header_in_syncing_context(zilog);
1206
1323
        uint64_t txg = dmu_tx_get_txg(tx);
1207
1324
        spa_t *spa = zilog->zl_spa;
 
1325
        uint64_t *replayed_seq = &zilog->zl_replayed_seq[txg & TXG_MASK];
1208
1326
        lwb_t *lwb;
1209
1327
 
1210
1328
        /*
1218
1336
 
1219
1337
        ASSERT(zilog->zl_stop_sync == 0);
1220
1338
 
1221
 
        zh->zh_replay_seq = zilog->zl_replayed_seq[txg & TXG_MASK];
 
1339
        if (*replayed_seq != 0) {
 
1340
                ASSERT(zh->zh_replay_seq < *replayed_seq);
 
1341
                zh->zh_replay_seq = *replayed_seq;
 
1342
                *replayed_seq = 0;
 
1343
        }
1222
1344
 
1223
1345
        if (zilog->zl_destroy_txg == txg) {
1224
1346
                blkptr_t blk = zh->zh_log;
1247
1369
                if (lwb->lwb_buf != NULL || lwb->lwb_max_txg > txg)
1248
1370
                        break;
1249
1371
                list_remove(&zilog->zl_lwb_list, lwb);
1250
 
                zio_free_blk(spa, &lwb->lwb_blk, txg);
 
1372
                zio_free_zil(spa, txg, &lwb->lwb_blk);
1251
1373
                kmem_cache_free(zil_lwb_cache, lwb);
1252
1374
 
1253
1375
                /*
1275
1397
        kmem_cache_destroy(zil_lwb_cache);
1276
1398
}
1277
1399
 
 
1400
void
 
1401
zil_set_logbias(zilog_t *zilog, uint64_t logbias)
 
1402
{
 
1403
        zilog->zl_logbias = logbias;
 
1404
}
 
1405
 
1278
1406
zilog_t *
1279
1407
zil_alloc(objset_t *os, zil_header_t *zh_phys)
1280
1408
{
1287
1415
        zilog->zl_spa = dmu_objset_spa(os);
1288
1416
        zilog->zl_dmu_pool = dmu_objset_pool(os);
1289
1417
        zilog->zl_destroy_txg = TXG_INITIAL - 1;
 
1418
        zilog->zl_logbias = dmu_objset_logbias(os);
1290
1419
 
1291
1420
        mutex_init(&zilog->zl_lock, NULL, MUTEX_DEFAULT, NULL);
1292
1421
 
1363
1492
        if (!zil_is_committed(zilog)) {
1364
1493
                uint64_t txg;
1365
1494
                dmu_tx_t *tx = dmu_tx_create(zilog->zl_os);
1366
 
                (void) dmu_tx_assign(tx, TXG_WAIT);
 
1495
                VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
1367
1496
                dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
1368
1497
                txg = dmu_tx_get_txg(tx);
1369
1498
                dmu_tx_commit(tx);
1437
1566
}
1438
1567
 
1439
1568
typedef struct zil_replay_arg {
1440
 
        objset_t        *zr_os;
1441
1569
        zil_replay_func_t **zr_replay;
1442
1570
        void            *zr_arg;
1443
1571
        boolean_t       zr_byteswap;
1444
 
        char            *zr_lrbuf;
 
1572
        char            *zr_lr;
1445
1573
} zil_replay_arg_t;
1446
1574
 
1447
 
static void
 
1575
static int
 
1576
zil_replay_error(zilog_t *zilog, lr_t *lr, int error)
 
1577
{
 
1578
        char name[MAXNAMELEN];
 
1579
 
 
1580
        zilog->zl_replaying_seq--;      /* didn't actually replay this one */
 
1581
 
 
1582
        dmu_objset_name(zilog->zl_os, name);
 
1583
 
 
1584
        cmn_err(CE_WARN, "ZFS replay transaction error %d, "
 
1585
            "dataset %s, seq 0x%llx, txtype %llu %s\n", error, name,
 
1586
            (u_longlong_t)lr->lrc_seq,
 
1587
            (u_longlong_t)(lr->lrc_txtype & ~TX_CI),
 
1588
            (lr->lrc_txtype & TX_CI) ? "CI" : "");
 
1589
 
 
1590
        return (error);
 
1591
}
 
1592
 
 
1593
static int
1448
1594
zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
1449
1595
{
1450
1596
        zil_replay_arg_t *zr = zra;
1451
1597
        const zil_header_t *zh = zilog->zl_header;
1452
1598
        uint64_t reclen = lr->lrc_reclen;
1453
1599
        uint64_t txtype = lr->lrc_txtype;
1454
 
        char *name;
1455
 
        int pass, error;
1456
 
 
1457
 
        if (!zilog->zl_replay)                  /* giving up */
1458
 
                return;
 
1600
        int error = 0;
 
1601
 
 
1602
        zilog->zl_replaying_seq = lr->lrc_seq;
 
1603
 
 
1604
        if (lr->lrc_seq <= zh->zh_replay_seq)   /* already replayed */
 
1605
                return (0);
1459
1606
 
1460
1607
        if (lr->lrc_txg < claim_txg)            /* already committed */
1461
 
                return;
1462
 
 
1463
 
        if (lr->lrc_seq <= zh->zh_replay_seq)   /* already replayed */
1464
 
                return;
 
1608
                return (0);
1465
1609
 
1466
1610
        /* Strip case-insensitive bit, still present in log record */
1467
1611
        txtype &= ~TX_CI;
1468
1612
 
1469
 
        if (txtype == 0 || txtype >= TX_MAX_TYPE) {
1470
 
                error = EINVAL;
1471
 
                goto bad;
 
1613
        if (txtype == 0 || txtype >= TX_MAX_TYPE)
 
1614
                return (zil_replay_error(zilog, lr, EINVAL));
 
1615
 
 
1616
        /*
 
1617
         * If this record type can be logged out of order, the object
 
1618
         * (lr_foid) may no longer exist.  That's legitimate, not an error.
 
1619
         */
 
1620
        if (TX_OOO(txtype)) {
 
1621
                error = dmu_object_info(zilog->zl_os,
 
1622
                    ((lr_ooo_t *)lr)->lr_foid, NULL);
 
1623
                if (error == ENOENT || error == EEXIST)
 
1624
                        return (0);
1472
1625
        }
1473
1626
 
1474
1627
        /*
1475
1628
         * Make a copy of the data so we can revise and extend it.
1476
1629
         */
1477
 
        bcopy(lr, zr->zr_lrbuf, reclen);
 
1630
        bcopy(lr, zr->zr_lr, reclen);
 
1631
 
 
1632
        /*
 
1633
         * If this is a TX_WRITE with a blkptr, suck in the data.
 
1634
         */
 
1635
        if (txtype == TX_WRITE && reclen == sizeof (lr_write_t)) {
 
1636
                error = zil_read_log_data(zilog, (lr_write_t *)lr,
 
1637
                    zr->zr_lr + reclen);
 
1638
                if (error)
 
1639
                        return (zil_replay_error(zilog, lr, error));
 
1640
        }
1478
1641
 
1479
1642
        /*
1480
1643
         * The log block containing this lr may have been byteswapped
1481
1644
         * so that we can easily examine common fields like lrc_txtype.
1482
 
         * However, the log is a mix of different data types, and only the
 
1645
         * However, the log is a mix of different record types, and only the
1483
1646
         * replay vectors know how to byteswap their records.  Therefore, if
1484
1647
         * the lr was byteswapped, undo it before invoking the replay vector.
1485
1648
         */
1486
1649
        if (zr->zr_byteswap)
1487
 
                byteswap_uint64_array(zr->zr_lrbuf, reclen);
1488
 
 
1489
 
        /*
1490
 
         * If this is a TX_WRITE with a blkptr, suck in the data.
1491
 
         */
1492
 
        if (txtype == TX_WRITE && reclen == sizeof (lr_write_t)) {
1493
 
                lr_write_t *lrw = (lr_write_t *)lr;
1494
 
                blkptr_t *wbp = &lrw->lr_blkptr;
1495
 
                uint64_t wlen = lrw->lr_length;
1496
 
                char *wbuf = zr->zr_lrbuf + reclen;
1497
 
 
1498
 
                if (BP_IS_HOLE(wbp)) {  /* compressed to a hole */
1499
 
                        bzero(wbuf, wlen);
1500
 
                } else {
1501
 
                        /*
1502
 
                         * A subsequent write may have overwritten this block,
1503
 
                         * in which case wbp may have been been freed and
1504
 
                         * reallocated, and our read of wbp may fail with a
1505
 
                         * checksum error.  We can safely ignore this because
1506
 
                         * the later write will provide the correct data.
1507
 
                         */
1508
 
                        zbookmark_t zb;
1509
 
 
1510
 
                        zb.zb_objset = dmu_objset_id(zilog->zl_os);
1511
 
                        zb.zb_object = lrw->lr_foid;
1512
 
                        zb.zb_level = -1;
1513
 
                        zb.zb_blkid = lrw->lr_offset / BP_GET_LSIZE(wbp);
1514
 
 
1515
 
                        (void) zio_wait(zio_read(NULL, zilog->zl_spa,
1516
 
                            wbp, wbuf, BP_GET_LSIZE(wbp), NULL, NULL,
1517
 
                            ZIO_PRIORITY_SYNC_READ,
1518
 
                            ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE, &zb));
1519
 
                        (void) memmove(wbuf, wbuf + lrw->lr_blkoff, wlen);
1520
 
                }
1521
 
        }
 
1650
                byteswap_uint64_array(zr->zr_lr, reclen);
1522
1651
 
1523
1652
        /*
1524
1653
         * We must now do two things atomically: replay this log record,
1526
1655
         * we did so. At the end of each replay function the sequence number
1527
1656
         * is updated if we are in replay mode.
1528
1657
         */
1529
 
        for (pass = 1; pass <= 2; pass++) {
1530
 
                zilog->zl_replaying_seq = lr->lrc_seq;
1531
 
                /* Only byteswap (if needed) on the 1st pass.  */
1532
 
                error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lrbuf,
1533
 
                    zr->zr_byteswap && pass == 1);
1534
 
 
1535
 
                if (!error)
1536
 
                        return;
1537
 
 
 
1658
        error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, zr->zr_byteswap);
 
1659
        if (error) {
1538
1660
                /*
1539
1661
                 * The DMU's dnode layer doesn't see removes until the txg
1540
1662
                 * commits, so a subsequent claim can spuriously fail with
1541
1663
                 * EEXIST. So if we receive any error we try syncing out
1542
 
                 * any removes then retry the transaction.
 
1664
                 * any removes then retry the transaction.  Note that we
 
1665
                 * specify B_FALSE for byteswap now, so we don't do it twice.
1543
1666
                 */
1544
 
                if (pass == 1)
1545
 
                        txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
 
1667
                txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
 
1668
                error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, B_FALSE);
 
1669
                if (error)
 
1670
                        return (zil_replay_error(zilog, lr, error));
1546
1671
        }
1547
 
 
1548
 
bad:
1549
 
        ASSERT(error);
1550
 
        name = kmem_alloc(MAXNAMELEN, KM_SLEEP);
1551
 
        dmu_objset_name(zr->zr_os, name);
1552
 
        cmn_err(CE_WARN, "ZFS replay transaction error %d, "
1553
 
            "dataset %s, seq 0x%llx, txtype %llu %s\n",
1554
 
            error, name, (u_longlong_t)lr->lrc_seq, (u_longlong_t)txtype,
1555
 
            (lr->lrc_txtype & TX_CI) ? "CI" : "");
1556
 
        zilog->zl_replay = B_FALSE;
1557
 
        kmem_free(name, MAXNAMELEN);
 
1672
        return (0);
1558
1673
}
1559
1674
 
1560
1675
/* ARGSUSED */
1561
 
static void
 
1676
static int
1562
1677
zil_incr_blks(zilog_t *zilog, blkptr_t *bp, void *arg, uint64_t claim_txg)
1563
1678
{
1564
1679
        zilog->zl_replay_blks++;
 
1680
 
 
1681
        return (0);
1565
1682
}
1566
1683
 
1567
1684
/*
1579
1696
                return;
1580
1697
        }
1581
1698
 
1582
 
        zr.zr_os = os;
1583
1699
        zr.zr_replay = replay_func;
1584
1700
        zr.zr_arg = arg;
1585
1701
        zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zh->zh_log);
1586
 
        zr.zr_lrbuf = kmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
 
1702
        zr.zr_lr = kmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
1587
1703
 
1588
1704
        /*
1589
1705
         * Wait for in-progress removes to sync before starting replay.
1595
1711
        ASSERT(zilog->zl_replay_blks == 0);
1596
1712
        (void) zil_parse(zilog, zil_incr_blks, zil_replay_log_record, &zr,
1597
1713
            zh->zh_claim_txg);
1598
 
        kmem_free(zr.zr_lrbuf, 2 * SPA_MAXBLOCKSIZE);
 
1714
        kmem_free(zr.zr_lr, 2 * SPA_MAXBLOCKSIZE);
1599
1715
 
1600
1716
        zil_destroy(zilog, B_FALSE);
1601
1717
        txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
1602
1718
        zilog->zl_replay = B_FALSE;
1603
1719
}
1604
1720
 
1605
 
/*
1606
 
 * Report whether all transactions are committed
1607
 
 */
1608
 
int
1609
 
zil_is_committed(zilog_t *zilog)
 
1721
boolean_t
 
1722
zil_replaying(zilog_t *zilog, dmu_tx_t *tx)
1610
1723
{
1611
 
        lwb_t *lwb;
1612
 
        int ret;
1613
 
 
1614
 
        mutex_enter(&zilog->zl_lock);
1615
 
        while (zilog->zl_writer)
1616
 
                cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
1617
 
 
1618
 
        /* recent unpushed intent log transactions? */
1619
 
        if (!list_is_empty(&zilog->zl_itx_list)) {
1620
 
                ret = B_FALSE;
1621
 
                goto out;
1622
 
        }
1623
 
 
1624
 
        /* intent log never used? */
1625
 
        lwb = list_head(&zilog->zl_lwb_list);
1626
 
        if (lwb == NULL) {
1627
 
                ret = B_TRUE;
1628
 
                goto out;
1629
 
        }
1630
 
 
1631
 
        /*
1632
 
         * more than 1 log buffer means zil_sync() hasn't yet freed
1633
 
         * entries after a txg has committed
1634
 
         */
1635
 
        if (list_next(&zilog->zl_lwb_list, lwb)) {
1636
 
                ret = B_FALSE;
1637
 
                goto out;
1638
 
        }
1639
 
 
1640
 
        ASSERT(zil_empty(zilog));
1641
 
        ret = B_TRUE;
1642
 
out:
1643
 
        cv_broadcast(&zilog->zl_cv_writer);
1644
 
        mutex_exit(&zilog->zl_lock);
1645
 
        return (ret);
 
1724
        if (zilog == NULL)
 
1725
                return (B_TRUE);
 
1726
 
 
1727
        if (zilog->zl_replay) {
 
1728
                dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
 
1729
                zilog->zl_replayed_seq[dmu_tx_get_txg(tx) & TXG_MASK] =
 
1730
                    zilog->zl_replaying_seq;
 
1731
                return (B_TRUE);
 
1732
        }
 
1733
 
 
1734
        return (B_FALSE);
1646
1735
}
1647
1736
 
1648
1737
/* ARGSUSED */
1649
1738
int
1650
 
zil_vdev_offline(char *osname, void *arg)
 
1739
zil_vdev_offline(const char *osname, void *arg)
1651
1740
{
1652
1741
        objset_t *os;
1653
1742
        zilog_t *zilog;
1654
1743
        int error;
1655
1744
 
1656
 
        error = dmu_objset_open(osname, DMU_OST_ANY, DS_MODE_USER, &os);
 
1745
        error = dmu_objset_hold(osname, FTAG, &os);
1657
1746
        if (error)
1658
1747
                return (error);
1659
1748
 
1662
1751
                error = EEXIST;
1663
1752
        else
1664
1753
                zil_resume(zilog);
1665
 
        dmu_objset_close(os);
 
1754
        dmu_objset_rele(os, FTAG);
1666
1755
        return (error);
1667
1756
}