Skip to content

Instantly share code, notes, and snippets.

@neurosis69
Last active February 21, 2024 13:04
Show Gist options
  • Save neurosis69/210102769fc8380bf9dad2bb784e5bc2 to your computer and use it in GitHub Desktop.
Save neurosis69/210102769fc8380bf9dad2bb784e5bc2 to your computer and use it in GitHub Desktop.
coin_store.py with batch inserts
  • added additional monitoring to the new_block function
  • changed the _add_coin_records function to use batches of size MAX_ROWS_PER_BATCH = 50 // fields_per_record
    async def new_block(
        self,
        height: uint32,
        timestamp: uint64,
        included_reward_coins: Set[Coin],
        tx_additions: List[Coin],
        tx_removals: List[bytes32],
    ) -> List[CoinRecord]:
        """
        Only called for blocks which are blocks (and thus have rewards and transactions)
        Returns a list of the CoinRecords that were added by this block
        """

        start = time.monotonic()

        additions = []

        for coin in tx_additions:
            record: CoinRecord = CoinRecord(
                coin,
                height,
                uint32(0),
                False,
                timestamp,
            )
            additions.append(record)

        if height == 0:
            assert len(included_reward_coins) == 0
        else:
            assert len(included_reward_coins) >= 2

        for coin in included_reward_coins:
            reward_coin_r: CoinRecord = CoinRecord(
                coin,
                height,
                uint32(0),
                True,
                timestamp,
            )
            additions.append(reward_coin_r)

        # Timing _add_coin_records
        start_add_coin_records = time.monotonic()
        await self._add_coin_records(additions)
        end_add_coin_records = time.monotonic()
        log.log(
                            logging.INFO,
                                        f"Height {height}: _add_coin_records took {end_add_coin_records - start_add_coin_records:0.2f}s for {len(additions)} additions."
                                                )

        # Timing _set_spent
        start_set_spent = time.monotonic()
        await self._set_spent(tx_removals, height)
        end_set_spent = time.monotonic()
        log.log(
                            logging.INFO,
                                        f"Height {height}: _set_spent took {end_set_spent - start_set_spent:0.2f}s for {len(tx_removals)} removals."
                                                )

        #await self._add_coin_records(additions)
        #await self._set_spent(tx_removals, height)

        end = time.monotonic()
        log.log(
            logging.WARNING if end - start > 10 else logging.DEBUG,
            f"Height {height}: It took {end - start:0.2f}s to apply {len(tx_additions)} additions and "
            + f"{len(tx_removals)} removals to the coin store. Make sure "
            + "blockchain database is on a fast drive",
        )

        return additions

...

    # Store CoinRecord in DB
    async def _add_coin_records(self, records: List[CoinRecord]) -> None:
        fields_per_record = 8 if self.db_wrapper.db_version == 2 else 9
        MAX_ROWS_PER_BATCH = 50 // fields_per_record

        def generate_values(record):
            if self.db_wrapper.db_version == 2:
                return (
                    record.coin.name(),
                    record.confirmed_block_index,
                    record.spent_block_index,
                    int(record.coinbase),
                    record.coin.puzzle_hash,
                    record.coin.parent_coin_info,
                    bytes(uint64(record.coin.amount)),
                    record.timestamp,
                )
            else:
                return (
                    record.coin.name().hex(),
                    record.confirmed_block_index,
                    record.spent_block_index,
                    int(record.spent),
                    int(record.coinbase),
                    record.coin.puzzle_hash.hex(),
                    record.coin.parent_coin_info.hex(),
                    bytes(uint64(record.coin.amount)),
                    record.timestamp,
                )

        for i in range(0, len(records), MAX_ROWS_PER_BATCH):
            batch = records[i:i + MAX_ROWS_PER_BATCH]
            placeholders = ', '.join(['(' + ', '.join(['?'] * fields_per_record) + ')' for _ in batch])
            batch_values = [generate_values(record) for record in batch]
            flat_values = [item for sublist in batch_values for item in sublist]

            async with self.db_wrapper.writer_maybe_transaction() as conn:
                await conn.executemany(f"INSERT INTO coin_record VALUES {placeholders}", (flat_values,))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment