Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save CDDelta/e2af7e02314b2e0c3b5f9eb616c645a6 to your computer and use it in GitHub Desktop.
Save CDDelta/e2af7e02314b2e0c3b5f9eb616c645a6 to your computer and use it in GitHub Desktop.
Retrofitting arweave-js for Streaming Uploads

Retrofitting arweave-js for Streaming Uploads

NOTE: The code documented in this gist is now available as a package here.

Since the release of the v2 transaction format, Arweave theoretically supports data transactions of infinite size. However, common interfaces for creating Arweave transactions such as arweave-js are limited in the size of transactions they can create due to their implementation requiring all transaction data be eagerly loaded into memory. This bounds them to their runtime memory limitations (eg. approx. ~2GB for Node) and prevents complete utility of Arweave's capabilities.

This document outlines some work to make arweave-js support creating transactions in a streaming manner, unbounded by runtime memory limitations.

Beyond Scope:

  • Browser support: Browsers and NodeJS do not have a common Streams API. A browser implementation of the below using this should not differ too much though. Additionally, shims like this could help.

Requirements:

  • Node 15
  • stream-chunker
  • exponential-backoff

Implementation

This work introduces the generateTransactionChunksAsync() method which is used in place of arweave-js's generateTransactionChunks() to generate the Merkle root and chunk ranges for the transaction to be created.

import Arweave from 'arweave';
import {
  buildLayers,
  Chunk,
  generateLeaves,
  generateProofs,
  MAX_CHUNK_SIZE,
  MIN_CHUNK_SIZE,
} from 'arweave/node/lib/merkle';
import Transaction from 'arweave/node/lib/transaction';
import chunker from 'stream-chunker';
// @ts-ignore
import { pipeline } from 'stream/promises';

/**
 * Generates the Arweave transaction chunk information from the piped data stream.
 */
export function generateTransactionChunksAsync() {
  return async (source: AsyncIterable<Buffer>): Promise<Transaction['chunks']> => {
    const chunks: Chunk[] = [];

    /**
     * @param chunkByteIndex the index the start of the specified chunk is located at within its original data stream.
     */
    async function addChunk(chunkByteIndex: number, chunk: Buffer): Promise<Chunk> {
      const dataHash = await Arweave.crypto.hash(chunk);

      const chunkRep = {
        dataHash,
        minByteRange: chunkByteIndex,
        maxByteRange: chunkByteIndex + chunk.byteLength,
      };

      chunks.push(chunkRep);

      return chunkRep;
    }

    let chunkStreamByteIndex = 0;
    let previousDataChunk: Buffer | undefined;
    let expectChunkGenerationCompleted = false;

    await pipeline(
      source,
      chunker(MAX_CHUNK_SIZE, { flush: true }),
      async function (chunkedSource: AsyncIterable<Buffer>) {
        for await (const chunk of chunkedSource) {
          if (expectChunkGenerationCompleted) {
            throw Error('Expected chunk generation to have completed.');
          }

          if (chunk.byteLength >= MIN_CHUNK_SIZE && chunk.byteLength <= MAX_CHUNK_SIZE) {
            await addChunk(chunkStreamByteIndex, chunk);
          } else if (chunk.byteLength < MIN_CHUNK_SIZE) {
            // TODO: Add tests to explicitly test this condition.

            if (previousDataChunk) {
              // If this final chunk is smaller than the minimum chunk size, rebalance this final chunk and
              // the previous chunk to keep the final chunk size above the minimum threshold.
              const remainingBytes = Buffer.concat(
                [previousDataChunk, chunk],
                previousDataChunk.byteLength + chunk.byteLength,
              );
              const rebalancedSizeForPreviousChunk = Math.ceil(remainingBytes.byteLength / 2);

              const previousChunk = chunks.pop()!;
              const rebalancedPreviousChunk = await addChunk(
                previousChunk.minByteRange,
                remainingBytes.slice(0, rebalancedSizeForPreviousChunk),
              );

              await addChunk(
                rebalancedPreviousChunk.maxByteRange,
                remainingBytes.slice(rebalancedSizeForPreviousChunk),
              );
            } else {
              // This entire stream should be smaller than the minimum chunk size, just add the chunk in.
              await addChunk(chunkStreamByteIndex, chunk);
            }

            expectChunkGenerationCompleted = true;
          } else if (chunk.byteLength > MAX_CHUNK_SIZE) {
            throw Error('Encountered chunk larger than max chunk size.');
          }

          chunkStreamByteIndex += chunk.byteLength;
          previousDataChunk = chunk;
        }
      },
    );

    const leaves = await generateLeaves(chunks);
    const root = await buildLayers(leaves);
    const proofs = await generateProofs(root);

    return {
      data_root: root.id,
      chunks,
      proofs,
    };
  };
}

It also introduces a higher level API createTransactionAsync() which is used in place of arweave.createTransaction().

import Arweave from 'arweave';
import { CreateTransactionInterface } from 'arweave/node/common';
import Transaction, { TransactionInterface } from 'arweave/node/lib/transaction';
import { bufferTob64Url } from 'arweave/node/lib/utils';
import { JWKInterface } from 'arweave/node/lib/wallet';
// @ts-ignore
import { pipeline } from 'stream/promises';
import { generateTransactionChunksAsync } from './generate-transaction-chunks-async';

/**
 * Creates an Arweave transaction from the piped data stream.
 */
export function createTransactionAsync(
  attributes: Partial<Omit<CreateTransactionInterface, 'data'>>,
  arweave: Arweave,
  jwk: JWKInterface | null | undefined,
) {
  return async function (source: AsyncIterable<Buffer>): Promise<Transaction> {
    const chunks = await pipeline(source, generateTransactionChunksAsync());

    const txAttrs = Object.assign({}, attributes);

    txAttrs.owner ??= jwk?.n;
    txAttrs.last_tx ??= await arweave.transactions.getTransactionAnchor();

    const lastChunk = chunks.chunks[chunks.chunks.length - 1];
    const dataByteLength = lastChunk.maxByteRange;

    txAttrs.reward ??= await arweave.transactions.getPrice(dataByteLength, txAttrs.target);

    txAttrs.data_size = dataByteLength.toString();

    const tx = new Transaction(txAttrs as TransactionInterface);

    tx.chunks = chunks;
    tx.data_root = bufferTob64Url(chunks.data_root);

    return tx;
  };
}

Finally, this work introduces the uploadTransactionAsync() method which allows Arweave transactions to be seeded in a streaming manner. Its implementation is derived from TransactionUploader in arweave-js.

import Arweave from 'arweave';
import { MAX_CHUNK_SIZE, validatePath } from 'arweave/node/lib/merkle';
import Transaction from 'arweave/node/lib/transaction';
import { b64UrlToBuffer, bufferTob64Url } from 'arweave/node/lib/utils';
import { backOff } from 'exponential-backoff';
import chunker from 'stream-chunker';
// @ts-ignore
import { pipeline } from 'stream/promises';

// Copied from `arweave-js`.
const FATAL_CHUNK_UPLOAD_ERRORS = [
  'invalid_json',
  'chunk_too_big',
  'data_path_too_big',
  'offset_too_big',
  'data_size_too_big',
  'chunk_proof_ratio_not_attractive',
  'invalid_proof',
];

interface ChunkUploadPayload {
  data_root: string;
  data_size: string;
  data_path: string;
  offset: string;
  chunk: string;
}

const MAX_CONCURRENT_CHUNK_UPLOAD_COUNT = 128;

/**
 * Uploads the piped data to the specified transaction.
 *
 * @param createTx whether or not the passed transaction should be created on the network.
 * This can be false if we want to reseed an existing transaction,
 */
export function uploadTransactionAsync(tx: Transaction, arweave: Arweave, createTx = true) {
  return async (source: AsyncIterable<Buffer>): Promise<void> => {
    if (!tx.chunks) {
      throw Error('Transaction has no computed chunks!');
    }

    if (createTx) {
      // Ensure the transaction data field is blank.
      // We'll upload this data in chunks instead.
      tx.data = new Uint8Array(0);

      const createTxRes = await arweave.api.post(`tx`, tx);
      if (!(createTxRes.status >= 200 && createTxRes.status < 300)) {
        throw new Error(`Failed to create transaction: ${createTxRes.data}`);
      }
    }

    const txChunkData = tx.chunks;
    const { chunks, proofs } = txChunkData;

    function prepareChunkUploadPayload(chunkIndex: number, chunkData: Buffer): ChunkUploadPayload {
      const proof = proofs[chunkIndex];
      return {
        data_root: tx.data_root,
        data_size: tx.data_size,
        data_path: bufferTob64Url(proof.proof),
        offset: proof.offset.toString(),
        chunk: bufferTob64Url(chunkData),
      };
    }

    await pipeline(
      source,
      chunker(MAX_CHUNK_SIZE, { flush: true }),
      async function (chunkedSource: AsyncIterable<Buffer>) {
        let chunkIndex = 0;
        let dataRebalancedIntoFinalChunk: Buffer | undefined;

        const activeChunkUploads: Promise<any>[] = [];

        for await (const chunkData of chunkedSource) {
          const currentChunk = chunks[chunkIndex];
          const chunkSize = currentChunk.maxByteRange - currentChunk.minByteRange;
          const expectedToBeFinalRebalancedChunk = dataRebalancedIntoFinalChunk != null;

          let chunkPayload: ChunkUploadPayload;

          if (chunkData.byteLength === chunkSize) {
            // If the transaction data chunks was never rebalanced this is the only code path that
            // will execute as the incoming chunked data as the will always be equivalent to `chunkSize`.
            chunkPayload = prepareChunkUploadPayload(chunkIndex, chunkData);
          } else if (chunkData.byteLength > chunkSize) {
            // If the incoming chunk data is larger than the expected size of the current chunk
            // it means that the transaction had chunks that were rebalanced to meet the minimum chunk size.
            //
            // It also means that the chunk we're currently processing should be the second to last
            // chunk.
            chunkPayload = prepareChunkUploadPayload(chunkIndex, chunkData.slice(0, chunkSize));

            dataRebalancedIntoFinalChunk = chunkData.slice(chunkSize);
          } else if (chunkData.byteLength < chunkSize && expectedToBeFinalRebalancedChunk) {
            // If this is the final rebalanced chunk, create the upload payload by concatenating the previous
            // chunk's data that was moved into this and the remaining stream data.
            chunkPayload = prepareChunkUploadPayload(
              chunkIndex,
              Buffer.concat(
                [dataRebalancedIntoFinalChunk!, chunkData],
                dataRebalancedIntoFinalChunk!.length + chunkData.length,
              ),
            );
          } else {
            throw Error('Transaction data stream terminated incorrectly.');
          }

          const chunkValid = await validatePath(
            txChunkData.data_root,
            parseInt(chunkPayload.offset),
            0,
            parseInt(chunkPayload.data_size),
            b64UrlToBuffer(chunkPayload.data_path),
          );

          if (!chunkValid) {
            throw new Error(`Unable to validate chunk ${chunkIndex}.`);
          }

          // Upload multiple transaction chunks in parallel to speed up the upload.

          // If we are already at the maximum concurrent chunk upload limit,
          // wait till all of them to complete first before continuing.
          if (activeChunkUploads.length >= MAX_CONCURRENT_CHUNK_UPLOAD_COUNT) {
            await Promise.all(activeChunkUploads);
            // Clear the active chunk uploads array.
            activeChunkUploads.length = 0;
          }

          activeChunkUploads.push(
            backOff(() => arweave.api.post('chunk', chunkPayload), {
              retry: (err) => !FATAL_CHUNK_UPLOAD_ERRORS.includes(err.message),
            }),
          );

          chunkIndex++;
        }

        await Promise.all(activeChunkUploads);

        if (chunkIndex < chunks.length) {
          throw Error('Transaction upload incomplete.');
        }
      },
    );
  };
}

Usage

Here's some example usage of the above implementation in the form of some Jest tests.

Transaction Creation

import Arweave from 'arweave';
import { CreateTransactionInterface } from 'arweave/node/common';
import { JWKInterface } from 'arweave/node/lib/wallet';
import { createReadStream } from 'fs';
import { readFile } from 'fs/promises';
// @ts-ignore
import { pipeline } from 'stream/promises';
import { createTransactionAsync } from './create-transaction-async';

describe('createTransactionAsync', () => {
  let wallet: JWKInterface;

  beforeAll(async () => {
    wallet = await arweave.wallets.generate();
  });

  const arweave = new Arweave({
    host: 'arweave.net',
    protocol: 'https',
    port: 443,
    logging: false,
    timeout: 15000,
  });

  it('should create transactions that match arweave-js', async () => {
    const filePath = './package-lock.json';
    const fileStream = createReadStream(filePath);

    const txAttrs = <Partial<CreateTransactionInterface>>{
      last_tx: 'MOCK_TX_ID',
      reward: '1',
    };

    const tx = await pipeline(fileStream, createTransactionAsync(txAttrs, arweave, wallet));

    const nativeTx = await arweave.createTransaction({ ...txAttrs, data: await readFile(filePath) }, wallet);

    await arweave.transactions.sign(tx, wallet);
    await arweave.transactions.sign(nativeTx, wallet);

    // Reset the data field from the `arweave-js` transaction as streamed transactions will not have this field.
    nativeTx.data = new Uint8Array(0);

    expect(tx).toMatchObject(nativeTx);
  });
});

Transaction Uploading

import Arweave from 'arweave';
import { createReadStream } from 'fs';
import { Readable } from 'stream';
// @ts-ignore
import { pipeline } from 'stream/promises';
import { generateTransactionChunksAsync } from './generate-transaction-chunks-async';
import { uploadTransactionAsync } from './upload-transaction-async';

describe('uploadTransactionAsync', () => {
  const arweave = new Arweave({
    host: 'arweave.net',
    protocol: 'https',
    port: 443,
    logging: false,
    timeout: 15000,
  });

  it('should successfully seed existing large transaction', async () => {
    jest.setTimeout(120 * 1000);

    const existingTxId = '<EXISTING-TX-ID>';
    const txDataFilePath = './Big-File.zip';

    const tx = await arweave.transactions.get(existingTxId);

    const txDataStreamForChunks = createReadStream(txDataFilePath);
    tx.chunks = await pipeline(txDataStreamForChunks, generateTransactionChunksAsync());

    const txDataStreamForUpload = createReadStream(txDataFilePath);
    const uploadOp = pipeline(txDataStreamForUpload, uploadTransactionAsync(tx, arweave, false));

    await expect(uploadOp).resolves.not.toThrow();
  });
});

Since the implementation makes use of NodeJS's native Streams API the transaction data can easily be piped to be encrypted, compressed etc before being passed to createTransactionAsync(). One will have to additionally write the result of any transformation to the original data stream to disk first as it will be needed for uploadTransactionAsync() later.

Known Issues

The Arweave /chunk endpoint continues to return OK even when provided chunks with incorrect data.

@PaulRosca
Copy link

Great stuff! Is there any specific reason to do the rebalancing for the last 2 chunks? I feel like it adds a lot of complexity for no apparent benefit but I might be missing some arweave related context.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment