Skip to content

Instantly share code, notes, and snippets.

@pcd1193182
Created September 26, 2018 04:21
Show Gist options
  • Save pcd1193182/818434499d91586280cdb5349a0c683e to your computer and use it in GitHub Desktop.
Save pcd1193182/818434499d91586280cdb5349a0c683e to your computer and use it in GitHub Desktop.
Diff between bottom half of dmu_send.c and dmu_recv.c
0a1,108
> /*
> * CDDL HEADER START
> *
> * The contents of this file are subject to the terms of the
> * Common Development and Distribution License (the "License").
> * You may not use this file except in compliance with the License.
> *
> * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
> * or http://www.opensolaris.org/os/licensing.
> * See the License for the specific language governing permissions
> * and limitations under the License.
> *
> * When distributing Covered Code, include this CDDL HEADER in each
> * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
> * If applicable, add the following below this CDDL HEADER, with the
> * fields enclosed by brackets "[]" replaced with your own identifying
> * information: Portions Copyright [yyyy] [name of copyright owner]
> *
> * CDDL HEADER END
> */
> /*
> * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
> * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
> * Copyright (c) 2011, 2018 by Delphix. All rights reserved.
> * Copyright (c) 2014, Joyent, Inc. All rights reserved.
> * Copyright 2014 HybridCluster. All rights reserved.
> */
>
> #include <sys/dmu.h>
> #include <sys/dmu_impl.h>
> #include <sys/dmu_send.h>
> #include <sys/dmu_recv.h>
> #include <sys/dmu_tx.h>
> #include <sys/dbuf.h>
> #include <sys/dnode.h>
> #include <sys/zfs_context.h>
> #include <sys/dmu_objset.h>
> #include <sys/dmu_traverse.h>
> #include <sys/dsl_dataset.h>
> #include <sys/dsl_dir.h>
> #include <sys/dsl_prop.h>
> #include <sys/dsl_pool.h>
> #include <sys/dsl_synctask.h>
> #include <sys/zfs_ioctl.h>
> #include <sys/zap.h>
> #include <sys/zvol.h>
> #include <sys/zio_checksum.h>
> #include <sys/zfs_znode.h>
> #include <zfs_fletcher.h>
> #include <sys/avl.h>
> #include <sys/ddt.h>
> #include <sys/zfs_onexit.h>
> #include <sys/dmu_send.h>
> #include <sys/dsl_destroy.h>
> #include <sys/blkptr.h>
> #include <sys/dsl_bookmark.h>
> #include <sys/zfeature.h>
> #include <sys/bqueue.h>
> #include <sys/objlist.h>
> #ifdef _KERNEL
> #include <sys/zfs_vfsops.h>
> #endif
>
> int zfs_recv_queue_length = SPA_MAXBLOCKSIZE;
> int zfs_recv_queue_ff = 20;
>
> static char *dmu_recv_tag = "dmu_recv_tag";
> const char *recv_clone_name = "%recv";
>
> static int receive_read_payload_and_next_header(dmu_recv_cookie_t *ra, int len,
> void *buf);
>
> struct receive_record_arg {
> dmu_replay_record_t header;
> void *payload; /* Pointer to a buffer containing the payload */
> /*
> * If the record is a write, pointer to the arc_buf_t containing the
> * payload.
> */
> arc_buf_t *arc_buf;
> int payload_size;
> uint64_t bytes_read; /* bytes read from stream when record created */
> boolean_t eos_marker; /* Marks the end of the stream */
> bqueue_node_t node;
> };
>
> struct receive_writer_arg {
> objset_t *os;
> boolean_t byteswap;
> bqueue_t q;
>
> /*
> * These three args are used to signal to the main thread that we're
> * done.
> */
> kmutex_t mutex;
> kcondvar_t cv;
> boolean_t done;
>
> int err;
> /* A map from guid to dataset to help handle dedup'd streams. */
> avl_tree_t *guid_to_ds_map;
> boolean_t resumable;
> boolean_t raw;
> uint64_t last_object;
> uint64_t last_offset;
> uint64_t max_object; /* highest object ID referenced in stream */
> uint64_t bytes_read; /* bytes read when current record created */
1a110,125
> /* Encryption parameters for the last received DRR_OBJECT_RANGE */
> boolean_t or_crypt_params_present;
> uint64_t or_firstobj;
> uint64_t or_numslots;
> uint8_t or_salt[ZIO_DATA_SALT_LEN];
> uint8_t or_iv[ZIO_DATA_IV_LEN];
> uint8_t or_mac[ZIO_DATA_MAC_LEN];
> boolean_t or_byteorder;
> };
>
> typedef struct guid_map_entry {
> uint64_t guid;
> boolean_t raw;
> dsl_dataset_t *gme_ds;
> avl_node_t avlnode;
> } guid_map_entry_t;
10a135,338
> static void
> byteswap_record(dmu_replay_record_t *drr)
> {
> #define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
> #define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
> drr->drr_type = BSWAP_32(drr->drr_type);
> drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
>
> switch (drr->drr_type) {
> case DRR_BEGIN:
> DO64(drr_begin.drr_magic);
> DO64(drr_begin.drr_versioninfo);
> DO64(drr_begin.drr_creation_time);
> DO32(drr_begin.drr_type);
> DO32(drr_begin.drr_flags);
> DO64(drr_begin.drr_toguid);
> DO64(drr_begin.drr_fromguid);
> break;
> case DRR_OBJECT:
> DO64(drr_object.drr_object);
> DO32(drr_object.drr_type);
> DO32(drr_object.drr_bonustype);
> DO32(drr_object.drr_blksz);
> DO32(drr_object.drr_raw_bonuslen);
> DO64(drr_object.drr_toguid);
> DO64(drr_object.drr_maxblkid);
> break;
> case DRR_FREEOBJECTS:
> DO64(drr_freeobjects.drr_firstobj);
> DO64(drr_freeobjects.drr_numobjs);
> DO64(drr_freeobjects.drr_toguid);
> break;
> case DRR_WRITE:
> DO64(drr_write.drr_object);
> DO32(drr_write.drr_type);
> DO64(drr_write.drr_offset);
> DO64(drr_write.drr_logical_size);
> DO64(drr_write.drr_toguid);
> ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum);
> DO64(drr_write.drr_key.ddk_prop);
> DO64(drr_write.drr_compressed_size);
> break;
> case DRR_WRITE_BYREF:
> DO64(drr_write_byref.drr_object);
> DO64(drr_write_byref.drr_offset);
> DO64(drr_write_byref.drr_length);
> DO64(drr_write_byref.drr_toguid);
> DO64(drr_write_byref.drr_refguid);
> DO64(drr_write_byref.drr_refobject);
> DO64(drr_write_byref.drr_refoffset);
> ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref.
> drr_key.ddk_cksum);
> DO64(drr_write_byref.drr_key.ddk_prop);
> break;
> case DRR_WRITE_EMBEDDED:
> DO64(drr_write_embedded.drr_object);
> DO64(drr_write_embedded.drr_offset);
> DO64(drr_write_embedded.drr_length);
> DO64(drr_write_embedded.drr_toguid);
> DO32(drr_write_embedded.drr_lsize);
> DO32(drr_write_embedded.drr_psize);
> break;
> case DRR_FREE:
> DO64(drr_free.drr_object);
> DO64(drr_free.drr_offset);
> DO64(drr_free.drr_length);
> DO64(drr_free.drr_toguid);
> break;
> case DRR_SPILL:
> DO64(drr_spill.drr_object);
> DO64(drr_spill.drr_length);
> DO64(drr_spill.drr_toguid);
> DO64(drr_spill.drr_compressed_size);
> DO32(drr_spill.drr_type);
> break;
> case DRR_OBJECT_RANGE:
> DO64(drr_object_range.drr_firstobj);
> DO64(drr_object_range.drr_numslots);
> DO64(drr_object_range.drr_toguid);
> break;
> case DRR_REDACT:
> DO64(drr_redact.drr_object);
> DO64(drr_redact.drr_offset);
> DO64(drr_redact.drr_length);
> DO64(drr_redact.drr_toguid);
> break;
> case DRR_END:
> DO64(drr_end.drr_toguid);
> ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum);
> break;
> default:
> break;
> }
>
> if (drr->drr_type != DRR_BEGIN) {
> ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum);
> }
>
> #undef DO64
> #undef DO32
> }
>
> static boolean_t
> redact_snaps_contains(uint64_t *snaps, uint64_t num_snaps, uint64_t guid)
> {
> for (int i = 0; i < num_snaps; i++) {
> if (snaps[i] == guid)
> return (B_TRUE);
> }
> return (B_FALSE);
> }
>
> /*
> * Check that the new stream we're trying to receive is redacted with respect to
> * a subset of the snapshots that the origin was redacted with respect to. For
> * the reasons behind this, see the man page on redacted zfs sends and receives.
> */
> static boolean_t
> compatible_redact_snaps(uint64_t *origin_snaps, uint64_t origin_num_snaps,
> uint64_t *redact_snaps, uint64_t num_redact_snaps)
> {
> /*
> * Short circuit the comparison; if we are redacted with respect to
> * more snapshots than the origin, we can't be redacted with respect
> * to a subset.
> */
> if (num_redact_snaps > origin_num_snaps) {
> return (B_FALSE);
> }
>
> for (int i = 0; i < num_redact_snaps; i++) {
> if (!redact_snaps_contains(origin_snaps, origin_num_snaps,
> redact_snaps[i])) {
> return (B_FALSE);
> }
> }
> return (B_TRUE);
> }
>
> static boolean_t
> redact_check(dmu_recv_begin_arg_t *drba, dsl_dataset_t *origin)
> {
> uint64_t *origin_snaps;
> uint64_t origin_num_snaps;
> dmu_recv_cookie_t *drc = drba->drba_cookie;
> struct drr_begin *drrb = drc->drc_drrb;
> int featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
> int err = 0;
> boolean_t ret = B_TRUE;
> uint64_t *redact_snaps;
> uint_t numredactsnaps;
>
> /*
> * If this is a full send stream, we're safe no matter what.
> */
> if (drrb->drr_fromguid == 0)
> return (ret);
>
> VERIFY(dsl_dataset_get_uint64_array_feature(origin,
> SPA_FEATURE_REDACTED_DATASETS, &origin_num_snaps, &origin_snaps));
>
> if (nvlist_lookup_uint64_array(drc->drc_begin_nvl,
> BEGINNV_REDACT_FROM_SNAPS, &redact_snaps, &numredactsnaps) ==
> 0) {
> /*
> * If the send stream was sent from the redaction bookmark or
> * the redacted version of the dataset, then we're safe. Verify
> * that this is from the a compatible redaction bookmark or
> * redacted dataset.
> */
> if (!compatible_redact_snaps(origin_snaps, origin_num_snaps,
> redact_snaps, numredactsnaps)) {
> err = EINVAL;
> }
> } else if (featureflags & DMU_BACKUP_FEATURE_REDACTED) {
> /*
> * If the stream is redacted, it must be redacted with respect
> * to a subset of what the origin is redacted with respect to.
> * See case number 2 in the zfs man page section on redacted zfs
> * send.
> */
> err = nvlist_lookup_uint64_array(drc->drc_begin_nvl,
> BEGINNV_REDACT_SNAPS, &redact_snaps, &numredactsnaps);
>
> if (err != 0 || !compatible_redact_snaps(origin_snaps,
> origin_num_snaps, redact_snaps, numredactsnaps)) {
> err = EINVAL;
> }
> } else if (!redact_snaps_contains(origin_snaps, origin_num_snaps,
> drrb->drr_toguid)) {
> /*
> * If the stream isn't redacted but the origin is, this must be
> * one of the snapshots the origin is redacted with respect to.
> * See case number 1 in the zfs man page section on redacted zfs
> * send.
> */
> err = EINVAL;
> }
>
> if (err != 0)
> ret = B_FALSE;
> return (ret);
> }
>
27c355
< return (error == 0 ? EBUSY : error);
---
> return (error == 0 ? SET_ERROR(EBUSY) : error);
34c362
< return (error == 0 ? EEXIST : error);
---
> return (error == 0 ? SET_ERROR(EEXIST) : error);
93a422,428
> if (dsl_dataset_feature_is_active(snap,
> SPA_FEATURE_REDACTED_DATASETS) && !redact_check(drba,
> snap)) {
> dsl_dataset_rele(snap, FTAG);
> return (SET_ERROR(EINVAL));
> }
>
125,126d459
<
< drba->drba_snapobj = 0;
132a466,512
> /*
> * Check that any feature flags used in the data stream we're receiving are
> * supported by the pool we are receiving into.
> *
> * Note that some of the features we explicitly check here have additional
> * (implicit) features they depend on, but those dependencies are enforced
> * through the zfeature_register() calls declaring the features that we
> * explicitly check.
> */
> static int
> recv_begin_check_feature_flags_impl(uint64_t featureflags, spa_t *spa)
> {
> /* Verify pool version supports SA if SA_SPILL feature set */
> if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
> spa_version(spa) < SPA_VERSION_SA)
> return (SET_ERROR(ENOTSUP));
>
> /*
> * LZ4 compressed, embedded, mooched, large blocks, and large_dnodes
> * in the stream can only be used if those pool features are enabled
> * because we don't attempt to decompress / un-embed / un-mooch /
> * split up the blocks / dnodes during the receive process.
> */
> if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
> !spa_feature_is_enabled(spa, SPA_FEATURE_LZ4_COMPRESS))
> return (SET_ERROR(ENOTSUP));
> if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
> !spa_feature_is_enabled(spa, SPA_FEATURE_EMBEDDED_DATA))
> return (SET_ERROR(ENOTSUP));
> if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
> !spa_feature_is_enabled(spa, SPA_FEATURE_LARGE_BLOCKS))
> return (SET_ERROR(ENOTSUP));
> if ((featureflags & DMU_BACKUP_FEATURE_LARGE_DNODE) &&
> !spa_feature_is_enabled(spa, SPA_FEATURE_LARGE_DNODE))
> return (SET_ERROR(ENOTSUP));
>
> /*
> * Receiving redacted streams requires that redacted datasets are
> * enabled.
> */
> if ((featureflags & DMU_BACKUP_FEATURE_REDACTED) &&
> !spa_feature_is_enabled(spa, SPA_FEATURE_REDACTED_DATASETS))
> return (SET_ERROR(ENOTSUP));
>
> return (0);
> }
>
143c523
< uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
---
> uint64_t featureflags = drba->drba_cookie->drc_featureflags;
157,160c537,539
< /* Verify pool version supports SA if SA_SPILL feature set */
< if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
< spa_version(dp->dp_spa) < SPA_VERSION_SA)
< return (SET_ERROR(ENOTSUP));
---
> error = recv_begin_check_feature_flags_impl(featureflags, dp->dp_spa);
> if (error != 0)
> return (error);
161a541
> /* Resumable receives require extensible datasets */
166,191d545
< /*
< * The receiving code doesn't know how to translate a WRITE_EMBEDDED
< * record to a plain WRITE record, so the pool must have the
< * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
< * records. Same with WRITE_EMBEDDED records that use LZ4 compression.
< */
< if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
< !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
< return (SET_ERROR(ENOTSUP));
< if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
< !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
< return (SET_ERROR(ENOTSUP));
<
< /*
< * The receiving code doesn't know how to translate large blocks
< * to smaller ones, so the pool must have the LARGE_BLOCKS
< * feature enabled if the stream has LARGE_BLOCKS. Same with
< * large dnodes.
< */
< if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
< !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
< return (SET_ERROR(ENOTSUP));
< if ((featureflags & DMU_BACKUP_FEATURE_LARGE_DNODE) &&
< !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_DNODE))
< return (SET_ERROR(ENOTSUP));
<
225c579
< if (fromguid != 0 && !(flags & DRR_FLAG_CLONE ||
---
> if (fromguid != 0 && !((flags & DRR_FLAG_CLONE) ||
234c588
< if (fromguid == 0 && drba->drba_origin &&
---
> if (fromguid == 0 && drba->drba_origin != NULL &&
291d644
<
308a662
>
315,316c669,686
< dsl_dataset_rele_flags(origin,
< dsflags, FTAG);
---
>
> /*
> * If the origin is redacted we need to verify that this
> * send stream can safely be received on top of the
> * origin.
> */
> if (dsl_dataset_feature_is_active(origin,
> SPA_FEATURE_REDACTED_DATASETS)) {
> if (!redact_check(drba, origin)) {
> dsl_dataset_rele_flags(origin, dsflags,
> FTAG);
> dsl_dataset_rele_flags(ds, dsflags,
> FTAG);
> return (SET_ERROR(EINVAL));
> }
> }
>
> dsl_dataset_rele_flags(origin, dsflags, FTAG);
330,332c700,703
< struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
< const char *tofs = drba->drba_cookie->drc_tofs;
< uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
---
> dmu_recv_cookie_t *drc = drba->drba_cookie;
> struct drr_begin *drrb = drc->drc_drrb;
> const char *tofs = drc->drc_tofs;
> uint64_t featureflags = drc->drc_featureflags;
353c724
< if (dcp == NULL && drba->drba_snapobj == 0 &&
---
> if (dcp == NULL && drrb->drr_fromguid == 0 &&
372d742
<
397c767,780
< drba->drba_cookie->drc_newfs = B_TRUE;
---
> drc->drc_newfs = B_TRUE;
> }
> VERIFY0(dsl_dataset_own_obj_force(dp, dsobj, dsflags, dmu_recv_tag,
> &newds));
> if (dsl_dataset_feature_is_active(newds,
> SPA_FEATURE_REDACTED_DATASETS)) {
> /*
> * If the origin dataset is redacted, the child will be redacted
> * when we create it. We clear the new dataset's
> * redaction info; if it should be redacted, we'll fill
> * in its information later.
> */
> dsl_dataset_deactivate_feature(newds,
> SPA_FEATURE_REDACTED_DATASETS, tx);
399,400d781
<
< VERIFY0(dsl_dataset_own_obj(dp, dsobj, dsflags, dmu_recv_tag, &newds));
403c784
< if (drba->drba_cookie->drc_resumable) {
---
> if (drc->drc_resumable) {
436a818,828
>
> uint64_t *redact_snaps;
> uint_t numredactsnaps;
> if (nvlist_lookup_uint64_array(drc->drc_begin_nvl,
> BEGINNV_REDACT_FROM_SNAPS, &redact_snaps,
> &numredactsnaps) == 0) {
> VERIFY0(zap_add(mos, dsobj,
> DS_FIELD_RESUME_REDACT_BOOKMARK_SNAPS,
> sizeof (*redact_snaps), numredactsnaps,
> redact_snaps, tx));
> }
448a841,850
>
> if (featureflags & DMU_BACKUP_FEATURE_REDACTED) {
> uint64_t *redact_snaps;
> uint_t numredactsnaps;
> VERIFY0(nvlist_lookup_uint64_array(drc->drc_begin_nvl,
> BEGINNV_REDACT_SNAPS, &redact_snaps, &numredactsnaps));
> dsl_dataset_activate_redaction(newds, redact_snaps,
> numredactsnaps, tx);
> }
>
474a877
> dmu_recv_cookie_t *drc = drba->drba_cookie;
476c879
< struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
---
> struct drr_begin *drrb = drc->drc_drrb;
479d881
< uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
481c883
< const char *tofs = drba->drba_cookie->drc_tofs;
---
> const char *tofs = drc->drc_tofs;
485c887
< ASSERT(featureflags & DMU_BACKUP_FEATURE_RESUMING);
---
> ASSERT(drc->drc_featureflags & DMU_BACKUP_FEATURE_RESUMING);
492,496d893
< /* Verify pool version supports SA if SA_SPILL feature set */
< if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
< spa_version(dp->dp_spa) < SPA_VERSION_SA)
< return (SET_ERROR(ENOTSUP));
<
498,501c895,896
< * The receiving code doesn't know how to translate a WRITE_EMBEDDED
< * record to a plain WRITE record, so the pool must have the
< * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
< * records. Same with WRITE_EMBEDDED records that use LZ4 compression.
---
> * This is mostly a sanity check since we should have already done these
> * checks during a previous attempt to receive the data.
503,521c898,901
< if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
< !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
< return (SET_ERROR(ENOTSUP));
< if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
< !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
< return (SET_ERROR(ENOTSUP));
<
< /*
< * The receiving code doesn't know how to translate large blocks
< * to smaller ones, so the pool must have the LARGE_BLOCKS
< * feature enabled if the stream has LARGE_BLOCKS. Same with
< * large dnodes.
< */
< if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
< !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
< return (SET_ERROR(ENOTSUP));
< if ((featureflags & DMU_BACKUP_FEATURE_LARGE_DNODE) &&
< !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_DNODE))
< return (SET_ERROR(ENOTSUP));
---
> error = recv_begin_check_feature_flags_impl(drc->drc_featureflags,
> dp->dp_spa);
> if (error != 0)
> return (error);
524a905
>
528c909
< if ((featureflags & DMU_BACKUP_FEATURE_RAW) == 0)
---
> if ((drc->drc_featureflags & DMU_BACKUP_FEATURE_RAW) == 0)
586a968,1001
> /*
> * If we're resuming, and the send is redacted, then the original send
> * must have been redacted, and must have been redacted with respect to
> * the same snapshots.
> */
> if (drc->drc_featureflags & DMU_BACKUP_FEATURE_REDACTED) {
> uint64_t num_ds_redact_snaps;
> uint64_t *ds_redact_snaps;
>
> uint_t num_stream_redact_snaps;
> uint64_t *stream_redact_snaps;
>
> if (nvlist_lookup_uint64_array(drc->drc_begin_nvl,
> BEGINNV_REDACT_SNAPS, &stream_redact_snaps,
> &num_stream_redact_snaps) != 0) {
> dsl_dataset_rele_flags(ds, dsflags, FTAG);
> return (SET_ERROR(EINVAL));
> }
>
> if (!dsl_dataset_get_uint64_array_feature(ds,
> SPA_FEATURE_REDACTED_DATASETS, &num_ds_redact_snaps,
> &ds_redact_snaps)) {
> dsl_dataset_rele_flags(ds, dsflags, FTAG);
> return (SET_ERROR(EINVAL));
> }
>
> for (int i = 0; i < num_ds_redact_snaps; i++) {
> if (!redact_snaps_contains(ds_redact_snaps,
> num_ds_redact_snaps, stream_redact_snaps[i])) {
> dsl_dataset_rele_flags(ds, dsflags, FTAG);
> return (SET_ERROR(EINVAL));
> }
> }
> }
597,598c1012
< struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
< uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
---
> uint64_t featureflags = drba->drba_cookie->drc_featureflags;
600d1013
< objset_t *os;
602d1014
< uint64_t dsobj;
606,607c1018,1019
< (void) snprintf(recvname, sizeof (recvname), "%s/%s",
< tofs, recv_clone_name);
---
> (void) snprintf(recvname, sizeof (recvname), "%s/%s", tofs,
> recv_clone_name);
615c1027,1028
< if (dsl_dataset_hold_flags(dp, recvname, dsflags, FTAG, &ds) != 0) {
---
> if (dsl_dataset_own_force(dp, recvname, dsflags, dmu_recv_tag, &ds)
> != 0) {
617c1030,1031
< VERIFY0(dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds));
---
> VERIFY0(dsl_dataset_own_force(dp, tofs, dsflags, dmu_recv_tag,
> &ds));
621d1034
< /* clear the inconsistent flag so that we can own it */
623,633d1035
< dmu_buf_will_dirty(ds->ds_dbuf, tx);
< dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
< dsobj = ds->ds_object;
< dsl_dataset_rele_flags(ds, dsflags, FTAG);
<
< VERIFY0(dsl_dataset_own_obj(dp, dsobj, dsflags, dmu_recv_tag, &ds));
< VERIFY0(dmu_objset_from_ds(ds, &os));
<
< dmu_buf_will_dirty(ds->ds_dbuf, tx);
< dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT;
<
651c1053,1054
< nvlist_t *hidden_args, char *origin, dmu_recv_cookie_t *drc)
---
> nvlist_t *hidden_args, char *origin, dmu_recv_cookie_t *drc,
> vnode_t *vp, offset_t *voffp)
653a1057
> int err;
676a1081,1107
> drc->drc_vp = vp;
> drc->drc_voff = *voffp;
> drc->drc_featureflags =
> DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
>
> uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen;
> void *payload = NULL;
> if (payloadlen != 0)
> payload = kmem_alloc(payloadlen, KM_SLEEP);
>
> err = receive_read_payload_and_next_header(drc, payloadlen,
> payload);
> if (err != 0) {
> kmem_free(payload, payloadlen);
> return (err);
> }
> if (payloadlen != 0) {
> err = nvlist_unpack(payload, payloadlen, &drc->drc_begin_nvl,
> KM_SLEEP);
> kmem_free(payload, payloadlen);
> if (err != 0) {
> kmem_free(drc->drc_next_rrd,
> sizeof (*drc->drc_next_rrd));
> return (err);
> }
> }
>
681,683c1112,1113
< if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) &
< DMU_BACKUP_FEATURE_RESUMING) {
< return (dsl_sync_task(tofs,
---
> if (drc->drc_featureflags & DMU_BACKUP_FEATURE_RESUMING) {
> err = dsl_sync_task(tofs,
685,686c1115,1116
< &drba, 5, ZFS_SPACE_CHECK_NORMAL));
< } else {
---
> &drba, 5, ZFS_SPACE_CHECK_NORMAL);
> } else {
715,798d1144
< }
<
< struct receive_record_arg {
< dmu_replay_record_t header;
< void *payload; /* Pointer to a buffer containing the payload */
< /*
< * If the record is a write, pointer to the arc_buf_t containing the
< * payload.
< */
< arc_buf_t *arc_buf;
< int payload_size;
< uint64_t bytes_read; /* bytes read from stream when record created */
< boolean_t eos_marker; /* Marks the end of the stream */
< bqueue_node_t node;
< };
<
< struct receive_writer_arg {
< objset_t *os;
< boolean_t byteswap;
< bqueue_t q;
<
< /*
< * These three args are used to signal to the main thread that we're
< * done.
< */
< kmutex_t mutex;
< kcondvar_t cv;
< boolean_t done;
<
< int err;
< /* A map from guid to dataset to help handle dedup'd streams. */
< avl_tree_t *guid_to_ds_map;
< boolean_t resumable;
< boolean_t raw;
< uint64_t last_object;
< uint64_t last_offset;
< uint64_t max_object; /* highest object ID referenced in stream */
< uint64_t bytes_read; /* bytes read when current record created */
<
< /* Encryption parameters for the last received DRR_OBJECT_RANGE */
< boolean_t or_crypt_params_present;
< uint64_t or_firstobj;
< uint64_t or_numslots;
< uint8_t or_salt[ZIO_DATA_SALT_LEN];
< uint8_t or_iv[ZIO_DATA_IV_LEN];
< uint8_t or_mac[ZIO_DATA_MAC_LEN];
< boolean_t or_byteorder;
< };
<
< struct objlist {
< list_t list; /* List of struct receive_objnode. */
< /*
< * Last object looked up. Used to assert that objects are being looked
< * up in ascending order.
< */
< uint64_t last_lookup;
< };
<
< struct receive_objnode {
< list_node_t node;
< uint64_t object;
< };
<
< struct receive_arg {
< objset_t *os;
< vnode_t *vp; /* The vnode to read the stream from */
< uint64_t voff; /* The current offset in the stream */
< uint64_t bytes_read;
< /*
< * A record that has had its payload read in, but hasn't yet been handed
< * off to the worker thread.
< */
< struct receive_record_arg *rrd;
< /* A record that has had its header read in, but not its payload. */
< struct receive_record_arg *next_rrd;
< zio_cksum_t cksum;
< zio_cksum_t prev_cksum;
< int err;
< boolean_t byteswap;
< boolean_t raw;
< uint64_t featureflags;
< /* Sorted list of objects not to issue prefetches for. */
< struct objlist ignore_objlist;
< };
800,805c1146,1151
< typedef struct guid_map_entry {
< uint64_t guid;
< boolean_t raw;
< dsl_dataset_t *gme_ds;
< avl_node_t avlnode;
< } guid_map_entry_t;
---
> if (err != 0) {
> kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd));
> nvlist_free(drc->drc_begin_nvl);
> }
> return (err);
> }
839c1185
< receive_read(struct receive_arg *ra, int len, void *buf)
---
> receive_read(dmu_recv_cookie_t *drc, int len, void *buf)
848c1194
< (ra->featureflags & DMU_BACKUP_FEATURE_RAW) != 0);
---
> (drc->drc_featureflags & DMU_BACKUP_FEATURE_RAW) != 0);
853c1199
< ra->err = vn_rdwr(UIO_READ, ra->vp,
---
> drc->drc_err = vn_rdwr(UIO_READ, drc->drc_vp,
855c1201
< ra->voff, UIO_SYSSPACE, FAPPEND,
---
> drc->drc_voff, UIO_SYSSPACE, FAPPEND,
863c1209
< ra->err = SET_ERROR(ECKSUM);
---
> drc->drc_err = SET_ERROR(ECKSUM);
865c1211
< ra->voff += len - done - resid;
---
> drc->drc_voff += len - done - resid;
867,868c1213,1214
< if (ra->err != 0)
< return (ra->err);
---
> if (drc->drc_err != 0)
> return (drc->drc_err);
871c1217
< ra->bytes_read += len;
---
> drc->drc_bytes_read += len;
877,973d1222
< noinline static void
< byteswap_record(dmu_replay_record_t *drr)
< {
< #define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
< #define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
< drr->drr_type = BSWAP_32(drr->drr_type);
< drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
<
< switch (drr->drr_type) {
< case DRR_BEGIN:
< DO64(drr_begin.drr_magic);
< DO64(drr_begin.drr_versioninfo);
< DO64(drr_begin.drr_creation_time);
< DO32(drr_begin.drr_type);
< DO32(drr_begin.drr_flags);
< DO64(drr_begin.drr_toguid);
< DO64(drr_begin.drr_fromguid);
< break;
< case DRR_OBJECT:
< DO64(drr_object.drr_object);
< DO32(drr_object.drr_type);
< DO32(drr_object.drr_bonustype);
< DO32(drr_object.drr_blksz);
< DO32(drr_object.drr_bonuslen);
< DO32(drr_object.drr_raw_bonuslen);
< DO64(drr_object.drr_toguid);
< DO64(drr_object.drr_maxblkid);
< break;
< case DRR_FREEOBJECTS:
< DO64(drr_freeobjects.drr_firstobj);
< DO64(drr_freeobjects.drr_numobjs);
< DO64(drr_freeobjects.drr_toguid);
< break;
< case DRR_WRITE:
< DO64(drr_write.drr_object);
< DO32(drr_write.drr_type);
< DO64(drr_write.drr_offset);
< DO64(drr_write.drr_logical_size);
< DO64(drr_write.drr_toguid);
< ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum);
< DO64(drr_write.drr_key.ddk_prop);
< DO64(drr_write.drr_compressed_size);
< break;
< case DRR_WRITE_BYREF:
< DO64(drr_write_byref.drr_object);
< DO64(drr_write_byref.drr_offset);
< DO64(drr_write_byref.drr_length);
< DO64(drr_write_byref.drr_toguid);
< DO64(drr_write_byref.drr_refguid);
< DO64(drr_write_byref.drr_refobject);
< DO64(drr_write_byref.drr_refoffset);
< ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref.
< drr_key.ddk_cksum);
< DO64(drr_write_byref.drr_key.ddk_prop);
< break;
< case DRR_WRITE_EMBEDDED:
< DO64(drr_write_embedded.drr_object);
< DO64(drr_write_embedded.drr_offset);
< DO64(drr_write_embedded.drr_length);
< DO64(drr_write_embedded.drr_toguid);
< DO32(drr_write_embedded.drr_lsize);
< DO32(drr_write_embedded.drr_psize);
< break;
< case DRR_FREE:
< DO64(drr_free.drr_object);
< DO64(drr_free.drr_offset);
< DO64(drr_free.drr_length);
< DO64(drr_free.drr_toguid);
< break;
< case DRR_SPILL:
< DO64(drr_spill.drr_object);
< DO64(drr_spill.drr_length);
< DO64(drr_spill.drr_toguid);
< DO64(drr_spill.drr_compressed_size);
< DO32(drr_spill.drr_type);
< break;
< case DRR_OBJECT_RANGE:
< DO64(drr_object_range.drr_firstobj);
< DO64(drr_object_range.drr_numslots);
< DO64(drr_object_range.drr_toguid);
< break;
< case DRR_END:
< DO64(drr_end.drr_toguid);
< ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum);
< break;
< default:
< break;
< }
<
< if (drr->drr_type != DRR_BEGIN) {
< ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum);
< }
<
< #undef DO64
< #undef DO32
< }
<
1045c1294
< (spa_maxdnodesize(dmu_objset_spa(rwa->os)) >> DNODE_SHIFT)) {
---
> (spa_maxdnodesize(dmu_objset_spa(rwa->os)) >> DNODE_SHIFT)) {
1070a1320
>
1102,1103c1352,1353
< err = dmu_free_long_range(rwa->os,
< drro->drr_object, 0, DMU_OBJECT_END);
---
> err = dmu_free_long_range(rwa->os, drro->drr_object,
> 0, DMU_OBJECT_END);
1301c1551,1552
< obj < drrfo->drr_firstobj + drrfo->drr_numobjs && next_err == 0;
---
> obj < drrfo->drr_firstobj + drrfo->drr_numobjs &&
> obj < DN_MAX_OBJECT && next_err == 0;
1316,1318d1566
<
< if (obj > rwa->max_object)
< rwa->max_object = obj;
1371a1620
> /* use the bonus buf to look up the dnode in dmu_assign_arcbuf */
1395c1644
< static int
---
> noinline static int
1515d1763
< uint32_t flags = 0;
1526,1527d1773
<
< flags |= DMU_READ_NO_DECRYPT;
1556,1557c1802
< VERIFY(0 == dbuf_spill_set_blksz(db_spill,
< drrs->drr_length, tx));
---
> VERIFY0(dbuf_spill_set_blksz(db_spill, drrs->drr_length, tx));
1582c1827
< if (drrf->drr_length != DMU_OBJECT_END &&
---
> if (drrf->drr_length != -1ULL &&
1646a1892,1907
> /*
> * Until we have the ability to redact large ranges of data efficiently, we
> * process these records as frees.
> */
> /* ARGSUSED */
> noinline static int
> receive_redact(struct receive_writer_arg *rwa, struct drr_redact *drrr)
> {
> struct drr_free drrf = {0};
> drrf.drr_length = drrr->drr_length;
> drrf.drr_object = drrr->drr_object;
> drrf.drr_offset = drrr->drr_offset;
> drrf.drr_toguid = drrr->drr_toguid;
> return (receive_free(rwa, &drrf));
> }
>
1678c1939
< receive_cksum(struct receive_arg *ra, int len, void *buf)
---
> receive_cksum(dmu_recv_cookie_t *drc, int len, void *buf)
1680,1681c1941,1943
< if (ra->byteswap) {
< (void) fletcher_4_incremental_byteswap(buf, len, &ra->cksum);
---
> if (drc->drc_byteswap) {
> (void) fletcher_4_incremental_byteswap(buf, len,
> &drc->drc_cksum);
1683c1945
< (void) fletcher_4_incremental_native(buf, len, &ra->cksum);
---
> (void) fletcher_4_incremental_native(buf, len, &drc->drc_cksum);
1690,1691c1952,1953
< * Allocate ra->next_rrd and read the next record's header into
< * ra->next_rrd->header.
---
> * Allocate drc->drc_next_rrd and read the next record's header into
> * drc->drc_next_rrd->header.
1695c1957
< receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf)
---
> receive_read_payload_and_next_header(dmu_recv_cookie_t *drc, int len, void *buf)
1698,1699d1959
< zio_cksum_t cksum_orig;
< zio_cksum_t *cksump;
1703c1963
< err = receive_read(ra, len, buf);
---
> err = receive_read(drc, len, buf);
1706c1966
< receive_cksum(ra, len, buf);
---
> receive_cksum(drc, len, buf);
1709,1712c1969,1972
< if (ra->rrd != NULL) {
< ra->rrd->payload = buf;
< ra->rrd->payload_size = len;
< ra->rrd->bytes_read = ra->bytes_read;
---
> if (drc->drc_rrd != NULL) {
> drc->drc_rrd->payload = buf;
> drc->drc_rrd->payload_size = len;
> drc->drc_rrd->bytes_read = drc->drc_bytes_read;
1716c1976
< ra->prev_cksum = ra->cksum;
---
> drc->drc_prev_cksum = drc->drc_cksum;
1718,1721c1978,1981
< ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
< err = receive_read(ra, sizeof (ra->next_rrd->header),
< &ra->next_rrd->header);
< ra->next_rrd->bytes_read = ra->bytes_read;
---
> drc->drc_next_rrd = kmem_zalloc(sizeof (*drc->drc_next_rrd), KM_SLEEP);
> err = receive_read(drc, sizeof (drc->drc_next_rrd->header),
> &drc->drc_next_rrd->header);
> drc->drc_next_rrd->bytes_read = drc->drc_bytes_read;
1724,1725c1984,1985
< kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
< ra->next_rrd = NULL;
---
> kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd));
> drc->drc_next_rrd = NULL;
1728,1730c1988,1990
< if (ra->next_rrd->header.drr_type == DRR_BEGIN) {
< kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
< ra->next_rrd = NULL;
---
> if (drc->drc_next_rrd->header.drr_type == DRR_BEGIN) {
> kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd));
> drc->drc_next_rrd = NULL;
1740c2000
< receive_cksum(ra,
---
> receive_cksum(drc,
1742c2002
< &ra->next_rrd->header);
---
> &drc->drc_next_rrd->header);
1744,1745c2004,2007
< cksum_orig = ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
< cksump = &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
---
> zio_cksum_t cksum_orig =
> drc->drc_next_rrd->header.drr_u.drr_checksum.drr_checksum;
> zio_cksum_t *cksump =
> &drc->drc_next_rrd->header.drr_u.drr_checksum.drr_checksum;
1747,1748c2009,2010
< if (ra->byteswap)
< byteswap_record(&ra->next_rrd->header);
---
> if (drc->drc_byteswap)
> byteswap_record(&drc->drc_next_rrd->header);
1751,1753c2013,2015
< !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) {
< kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
< ra->next_rrd = NULL;
---
> !ZIO_CHECKSUM_EQUAL(drc->drc_cksum, *cksump)) {
> kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd));
> drc->drc_next_rrd = NULL;
1757c2019
< receive_cksum(ra, sizeof (cksum_orig), &cksum_orig);
---
> receive_cksum(drc, sizeof (cksum_orig), &cksum_orig);
1762,1823d2023
< static void
< objlist_create(struct objlist *list)
< {
< list_create(&list->list, sizeof (struct receive_objnode),
< offsetof(struct receive_objnode, node));
< list->last_lookup = 0;
< }
<
< static void
< objlist_destroy(struct objlist *list)
< {
< for (struct receive_objnode *n = list_remove_head(&list->list);
< n != NULL; n = list_remove_head(&list->list)) {
< kmem_free(n, sizeof (*n));
< }
< list_destroy(&list->list);
< }
<
< /*
< * This function looks through the objlist to see if the specified object number
< * is contained in the objlist. In the process, it will remove all object
< * numbers in the list that are smaller than the specified object number. Thus,
< * any lookup of an object number smaller than a previously looked up object
< * number will always return false; therefore, all lookups should be done in
< * ascending order.
< */
< static boolean_t
< objlist_exists(struct objlist *list, uint64_t object)
< {
< struct receive_objnode *node = list_head(&list->list);
< ASSERT3U(object, >=, list->last_lookup);
< list->last_lookup = object;
< while (node != NULL && node->object < object) {
< VERIFY3P(node, ==, list_remove_head(&list->list));
< kmem_free(node, sizeof (*node));
< node = list_head(&list->list);
< }
< return (node != NULL && node->object == object);
< }
<
< /*
< * The objlist is a list of object numbers stored in ascending order. However,
< * the insertion of new object numbers does not seek out the correct location to
< * store a new object number; instead, it appends it to the list for simplicity.
< * Thus, any users must take care to only insert new object numbers in ascending
< * order.
< */
< static void
< objlist_insert(struct objlist *list, uint64_t object)
< {
< struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP);
< node->object = object;
< #ifdef ZFS_DEBUG
< {
< struct receive_objnode *last_object = list_tail(&list->list);
< uint64_t last_objnum = (last_object != NULL ? last_object->object : 0);
< ASSERT3U(node->object, >, last_objnum);
< }
< #endif
< list_insert_tail(&list->list, node);
< }
<
1843,1844c2043,2044
< receive_read_prefetch(struct receive_arg *ra,
< uint64_t object, uint64_t offset, uint64_t length)
---
> receive_read_prefetch(dmu_recv_cookie_t *drc, uint64_t object, uint64_t offset,
> uint64_t length)
1846,1847c2046,2047
< if (!objlist_exists(&ra->ignore_objlist, object)) {
< dmu_prefetch(ra->os, object, 1, offset, length,
---
> if (!objlist_exists(drc->drc_ignore_objlist, object)) {
> dmu_prefetch(drc->drc_os, object, 1, offset, length,
1856c2056
< receive_read_record(struct receive_arg *ra)
---
> receive_read_record(dmu_recv_cookie_t *drc)
1860c2060
< switch (ra->rrd->header.drr_type) {
---
> switch (drc->drc_rrd->header.drr_type) {
1863c2063,2064
< struct drr_object *drro = &ra->rrd->header.drr_u.drr_object;
---
> struct drr_object *drro =
> &drc->drc_rrd->header.drr_u.drr_object;
1868c2069
< err = receive_read_payload_and_next_header(ra, size, buf);
---
> err = receive_read_payload_and_next_header(drc, size, buf);
1873c2074
< err = dmu_object_info(ra->os, drro->drr_object, &doi);
---
> err = dmu_object_info(drc->drc_os, drro->drr_object, &doi);
1880c2081,2082
< objlist_insert(&ra->ignore_objlist, drro->drr_object);
---
> objlist_insert(drc->drc_ignore_objlist,
> drro->drr_object);
1887c2089
< err = receive_read_payload_and_next_header(ra, 0, NULL);
---
> err = receive_read_payload_and_next_header(drc, 0, NULL);
1892c2094
< struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write;
---
> struct drr_write *drrw = &drc->drc_rrd->header.drr_u.drr_write;
1896c2098
< if (ra->raw) {
---
> if (drc->drc_raw) {
1899c2101
< ra->byteswap;
---
> drc->drc_byteswap;
1901c2103
< abuf = arc_loan_raw_buf(dmu_objset_spa(ra->os),
---
> abuf = arc_loan_raw_buf(dmu_objset_spa(drc->drc_os),
1912c2114
< dmu_objset_spa(ra->os),
---
> dmu_objset_spa(drc->drc_os),
1916c2118
< abuf = arc_loan_buf(dmu_objset_spa(ra->os),
---
> abuf = arc_loan_buf(dmu_objset_spa(drc->drc_os),
1920c2122
< err = receive_read_payload_and_next_header(ra,
---
> err = receive_read_payload_and_next_header(drc,
1926,1927c2128,2129
< ra->rrd->arc_buf = abuf;
< receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset,
---
> drc->drc_rrd->arc_buf = abuf;
> receive_read_prefetch(drc, drrw->drr_object, drrw->drr_offset,
1934,1936c2136,2138
< &ra->rrd->header.drr_u.drr_write_byref;
< err = receive_read_payload_and_next_header(ra, 0, NULL);
< receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset,
---
> &drc->drc_rrd->header.drr_u.drr_write_byref;
> err = receive_read_payload_and_next_header(drc, 0, NULL);
> receive_read_prefetch(drc, drrwb->drr_object, drrwb->drr_offset,
1943c2145
< &ra->rrd->header.drr_u.drr_write_embedded;
---
> &drc->drc_rrd->header.drr_u.drr_write_embedded;
1947c2149
< err = receive_read_payload_and_next_header(ra, size, buf);
---
> err = receive_read_payload_and_next_header(drc, size, buf);
1953c2155
< receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset,
---
> receive_read_prefetch(drc, drrwe->drr_object, drrwe->drr_offset,
1957a2160
> case DRR_REDACT:
1963c2166
< err = receive_read_payload_and_next_header(ra, 0, NULL);
---
> err = receive_read_payload_and_next_header(drc, 0, NULL);
1968,1969c2171,2173
< struct drr_end *drre = &ra->rrd->header.drr_u.drr_end;
< if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum))
---
> struct drr_end *drre = &drc->drc_rrd->header.drr_u.drr_end;
> if (!ZIO_CHECKSUM_EQUAL(drc->drc_prev_cksum,
> drre->drr_checksum))
1975c2179
< struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill;
---
> struct drr_spill *drrs = &drc->drc_rrd->header.drr_u.drr_spill;
1977,1978d2180
< int len = DRR_SPILL_PAYLOAD_SIZE(drrs);
<
1980c2182
< if (ra->raw) {
---
> if (drc->drc_raw) {
1983c2185
< ra->byteswap;
---
> drc->drc_byteswap;
1985,1986c2187,2188
< abuf = arc_loan_raw_buf(dmu_objset_spa(ra->os),
< dmu_objset_id(ra->os), byteorder, drrs->drr_salt,
---
> abuf = arc_loan_raw_buf(dmu_objset_spa(drc->drc_os),
> drrs->drr_object, byteorder, drrs->drr_salt,
1991c2193
< abuf = arc_loan_buf(dmu_objset_spa(ra->os),
---
> abuf = arc_loan_buf(dmu_objset_spa(drc->drc_os),
1995,1998c2197,2199
<
< err = receive_read_payload_and_next_header(ra, len,
< abuf->b_data);
< if (err != 0) {
---
> err = receive_read_payload_and_next_header(drc,
> DRR_SPILL_PAYLOAD_SIZE(drrs), abuf->b_data);
> if (err != 0)
2000,2002c2201,2202
< return (err);
< }
< ra->rrd->arc_buf = abuf;
---
> else
> drc->drc_rrd->arc_buf = abuf;
2007c2207
< err = receive_read_payload_and_next_header(ra, 0, NULL);
---
> err = receive_read_payload_and_next_header(drc, 0, NULL);
2008a2209
>
2014a2216,2217
>
>
2168d2370
< /* if receive_spill() is successful, it consumes the arc_buf */
2179c2381,2388
< return (receive_object_range(rwa, drror));
---
> err = receive_object_range(rwa, drror);
> break;
> }
> case DRR_REDACT:
> {
> struct drr_redact *drrr = &rrd->header.drr_u.drr_redact;
> err = receive_redact(rwa, drrr);
> break;
2231c2440
< resume_check(struct receive_arg *ra, nvlist_t *begin_nvl)
---
> resume_check(dmu_recv_cookie_t *drc, nvlist_t *begin_nvl)
2234,2235c2443,2444
< objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset;
< uint64_t dsobj = dmu_objset_id(ra->os);
---
> objset_t *mos = dmu_objset_pool(drc->drc_os)->dp_meta_objset;
> uint64_t dsobj = dmu_objset_id(drc->drc_os);
2269,2270c2478,2479
< dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp,
< int cleanup_fd, uint64_t *action_handlep)
---
> dmu_recv_stream(dmu_recv_cookie_t *drc, int cleanup_fd,
> uint64_t *action_handlep, offset_t *voffp)
2273,2287c2482
< struct receive_arg *ra;
< struct receive_writer_arg *rwa;
< int featureflags;
< uint32_t payloadlen;
< void *payload;
< nvlist_t *begin_nvl = NULL;
<
< ra = kmem_zalloc(sizeof (*ra), KM_SLEEP);
< rwa = kmem_zalloc(sizeof (*rwa), KM_SLEEP);
<
< ra->byteswap = drc->drc_byteswap;
< ra->raw = drc->drc_raw;
< ra->cksum = drc->drc_cksum;
< ra->vp = vp;
< ra->voff = *voffp;
---
> struct receive_writer_arg *rwa = kmem_zalloc(sizeof (*rwa), KM_SLEEP);
2289a2485
> uint64_t bytes;
2292c2488,2489
< sizeof (ra->bytes_read), 1, &ra->bytes_read);
---
> sizeof (bytes), 1, &bytes);
> drc->drc_bytes_read += bytes;
2295c2492
< objlist_create(&ra->ignore_objlist);
---
> drc->drc_ignore_objlist = objlist_create();
2305,2306c2502
< VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra->os));
<
---
> VERIFY0(dmu_objset_from_ds(drc->drc_ds, &drc->drc_os));
2308,2313c2504,2505
<
< featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
< ra->featureflags = featureflags;
<
< ASSERT0(ra->os->os_encrypted &&
< (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA));
---
> ASSERT0(drc->drc_os->os_encrypted &&
> (drc->drc_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA));
2316c2508
< if (featureflags & DMU_BACKUP_FEATURE_DEDUP) {
---
> if (drc->drc_featureflags & DMU_BACKUP_FEATURE_DEDUP) {
2350,2367d2541
< payloadlen = drc->drc_drr_begin->drr_payloadlen;
< payload = NULL;
< if (payloadlen != 0)
< payload = kmem_alloc(payloadlen, KM_SLEEP);
<
< err = receive_read_payload_and_next_header(ra, payloadlen, payload);
< if (err != 0) {
< if (payloadlen != 0)
< kmem_free(payload, payloadlen);
< goto out;
< }
< if (payloadlen != 0) {
< err = nvlist_unpack(payload, payloadlen, &begin_nvl, KM_SLEEP);
< kmem_free(payload, payloadlen);
< if (err != 0)
< goto out;
< }
<
2369c2543
< if (featureflags & DMU_BACKUP_FEATURE_RAW) {
---
> if (drc->drc_featureflags & DMU_BACKUP_FEATURE_RAW) {
2372c2546
< ASSERT(ra->os->os_encrypted);
---
> ASSERT(drc->drc_os->os_encrypted);
2375c2549,2550
< err = nvlist_lookup_nvlist(begin_nvl, "crypt_keydata", &keynvl);
---
> err = nvlist_lookup_nvlist(drc->drc_begin_nvl, "crypt_keydata",
> &keynvl);
2379,2385c2554
< /*
< * If this is a new dataset we set the key immediately.
< * Otherwise we don't want to change the key until we
< * are sure the rest of the receive succeeded so we stash
< * the keynvl away until then.
< */
< err = dsl_crypto_recv_raw(spa_name(ra->os->os_spa),
---
> err = dsl_crypto_recv_raw(spa_name(drc->drc_os->os_spa),
2395,2396c2564,2565
< if (featureflags & DMU_BACKUP_FEATURE_RESUMING) {
< err = resume_check(ra, begin_nvl);
---
> if (drc->drc_featureflags & DMU_BACKUP_FEATURE_RESUMING) {
> err = resume_check(drc, drc->drc_begin_nvl);
2401c2570
< (void) bqueue_init(&rwa->q,
---
> (void) bqueue_init(&rwa->q, zfs_recv_queue_ff,
2406c2575,2576
< rwa->os = ra->os;
---
> rwa->os = drc->drc_os;
> rwa->os->os_raw_receive = drc->drc_raw;
2410d2579
< rwa->os->os_raw_receive = drc->drc_raw;
2424,2427c2593,2596
< * first loop and ra->rrd was never allocated, or it's later and ra->rrd
< * has been handed off to the writer thread who will free it. Finally,
< * if receive_read_record fails or we're at the end of the stream, then
< * we free ra->rrd and exit.
---
> * first loop and drc->drc_rrd was never allocated, or it's later, and
> * drc->drc_rrd has been handed off to the writer thread who will free
> * it. Finally, if receive_read_record fails or we're at the end of the
> * stream, then we free drc->drc_rrd and exit.
2435,2443c2604,2612
< ASSERT3P(ra->rrd, ==, NULL);
< ra->rrd = ra->next_rrd;
< ra->next_rrd = NULL;
< /* Allocates and loads header into ra->next_rrd */
< err = receive_read_record(ra);
<
< if (ra->rrd->header.drr_type == DRR_END || err != 0) {
< kmem_free(ra->rrd, sizeof (*ra->rrd));
< ra->rrd = NULL;
---
> ASSERT3P(drc->drc_rrd, ==, NULL);
> drc->drc_rrd = drc->drc_next_rrd;
> drc->drc_next_rrd = NULL;
> /* Allocates and loads header into drc->drc_next_rrd */
> err = receive_read_record(drc);
>
> if (drc->drc_rrd->header.drr_type == DRR_END || err != 0) {
> kmem_free(drc->drc_rrd, sizeof (*drc->drc_rrd));
> drc->drc_rrd = NULL;
2447,2454c2616,2626
< bqueue_enqueue(&rwa->q, ra->rrd,
< sizeof (struct receive_record_arg) + ra->rrd->payload_size);
< ra->rrd = NULL;
< }
< if (ra->next_rrd == NULL)
< ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
< ra->next_rrd->eos_marker = B_TRUE;
< bqueue_enqueue(&rwa->q, ra->next_rrd, 1);
---
> bqueue_enqueue(&rwa->q, drc->drc_rrd,
> sizeof (struct receive_record_arg) +
> drc->drc_rrd->payload_size);
> drc->drc_rrd = NULL;
> }
> if (drc->drc_next_rrd == NULL) {
> drc->drc_next_rrd = kmem_zalloc(sizeof (*drc->drc_next_rrd),
> KM_SLEEP);
> }
> drc->drc_next_rrd->eos_marker = B_TRUE;
> bqueue_enqueue_flush(&rwa->q, drc->drc_next_rrd, 1);
2458c2630,2634
< cv_wait(&rwa->cv, &rwa->mutex);
---
> /*
> * We need to use cv_wait_sig() so that any process that may
> * be sleeping here can still fork.
> */
> (void) cv_wait_sig(&rwa->cv, &rwa->mutex);
2495,2496c2671,2674
< nvlist_free(begin_nvl);
< if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1))
---
> kmem_free(rwa, sizeof (*rwa));
> nvlist_free(drc->drc_begin_nvl);
> if ((drc->drc_featureflags & DMU_BACKUP_FEATURE_DEDUP) &&
> (cleanup_fd != -1))
2509,2512c2687,2689
< *voffp = ra->voff;
< objlist_destroy(&ra->ignore_objlist);
< kmem_free(ra, sizeof (*ra));
< kmem_free(rwa, sizeof (*rwa));
---
> objlist_destroy(drc->drc_ignore_objlist);
> drc->drc_ignore_objlist = NULL;
> *voffp = drc->drc_voff;
2637c2814,2815
< VERIFY3P(drc->drc_ds->ds_prev, ==, origin_head->ds_prev);
---
> VERIFY3P(drc->drc_ds->ds_prev, ==,
> origin_head->ds_prev);
2693a2872,2873
> (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
> DS_FIELD_RESUME_REDACT_BOOKMARK_SNAPS, tx);
2733a2914
>
2822a3004,3011
>
> #if defined(_KERNEL)
> module_param(zfs_recv_queue_length, int, 0644);
> MODULE_PARM_DESC(zfs_recv_queue_length, "Maximum receive queue length");
>
> module_param(zfs_recv_queue_ff, int, 0644);
> MODULE_PARM_DESC(zfs_recv_queue_ff, "Receive queue fill fraction");
> #endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment