Skip to content

Instantly share code, notes, and snippets.

@mattdesl
Last active February 11, 2022 09:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattdesl/82870aebb00eb519209fa9114fe5eb97 to your computer and use it in GitHub Desktop.
Save mattdesl/82870aebb00eb519209fa9114fe5eb97 to your computer and use it in GitHub Desktop.

A fork of the swap contract from hen-community. Also see v2 for some fixes and this thread for context.

Main changes:

  • term 'swap' is renamed to 'listing' for clarity
  • the first part of the file is a standard FA2 token contract (ideally should be a separate file)
    • however, token_royalties bigmap is added to mint entrypoint
    • also getRoyalties onchain view is added to FA2_mint class
  • the MRKT contract uses the FA2 onchain view to retrieve its royalties, rather than letting the seller decide
  • the manager fee has been removed

TODOs:

  • use a list to define royalty structure, allowing for multiple beneficiaries
  • reconsider how to handle platform/manager fee
  • test whether getRoyalties onchain view exists—if not, consider some kind of fallback?
  • include in the tests some guard to ensure the correct amount of tez is split during collect operation
  • include more tests
import smartpy as sp
# Reference:
# https://github.com/hen-community/objkt-swap/blob/jagracar/v3-prototype/smart-py/objkt_swap_v3.py
##
## ## Meta-Programming Configuration
##
## The `FA2_config` class holds the meta-programming configuration.
##
class FA2_config:
def __init__(self,
debug_mode = False,
single_asset = False,
non_fungible = False,
add_mutez_transfer = False,
readable = True,
force_layouts = True,
support_operator = True,
assume_consecutive_token_ids = True,
store_total_supply = True,
lazy_entry_points = False,
allow_self_transfer = False,
use_token_metadata_offchain_view = False
):
if debug_mode:
self.my_map = sp.map
else:
self.my_map = sp.big_map
# The option `debug_mode` makes the code generation use
# regular maps instead of big-maps, hence it makes inspection
# of the state of the contract easier.
self.use_token_metadata_offchain_view = use_token_metadata_offchain_view
# Include offchain view for accessing the token metadata (requires TZIP-016 contract metadata)
self.single_asset = single_asset
# This makes the contract save some gas and storage by
# working only for the token-id `0`.
self.non_fungible = non_fungible
# Enforce the non-fungibility of the tokens, i.e. the fact
# that total supply has to be 1.
self.readable = readable
# The `readable` option is a legacy setting that we keep around
# only for benchmarking purposes.
#
# User-accounts are kept in a big-map:
# `(user-address * token-id) -> ownership-info`.
#
# For the Babylon protocol, one had to use `readable = False`
# in order to use `PACK` on the keys of the big-map.
self.force_layouts = force_layouts
# The specification requires all interface-fronting records
# and variants to be *right-combs;* we keep
# this parameter to be able to compare performance & code-size.
self.support_operator = support_operator
# The operator entry-points always have to be there, but there is
# definitely a use-case for having them completely empty (saving
# storage and gas when `support_operator` is `False).
self.assume_consecutive_token_ids = assume_consecutive_token_ids
# For a previous version of the TZIP specification, it was
# necessary to keep track of the set of all tokens in the contract.
#
# The set of tokens is for now still available; this parameter
# guides how to implement it:
# If `true` we don't need a real set of token ids, just to know how
# many there are.
self.store_total_supply = store_total_supply
# Whether to store the total-supply for each token (next to
# the token-metadata).
self.add_mutez_transfer = add_mutez_transfer
# Add an entry point for the administrator to transfer tez potentially
# in the contract's balance.
self.lazy_entry_points = lazy_entry_points
#
# Those are “compilation” options of SmartPy into Michelson.
#
self.allow_self_transfer = allow_self_transfer
# Authorize call of `transfer` entry_point from self
name = "FA2"
if debug_mode:
name += "-debug"
if single_asset:
name += "-single_asset"
if non_fungible:
name += "-nft"
if add_mutez_transfer:
name += "-mutez"
if not readable:
name += "-no_readable"
if not force_layouts:
name += "-no_layout"
if not support_operator:
name += "-no_ops"
if not assume_consecutive_token_ids:
name += "-no_toknat"
if not store_total_supply:
name += "-no_totsup"
if lazy_entry_points:
name += "-lep"
if allow_self_transfer:
name += "-self_transfer"
self.name = name
## ## Auxiliary Classes and Values
##
## The definitions below implement SmartML-types and functions for various
## important types.
##
token_id_type = sp.TNat
class Error_message:
def __init__(self, config):
self.config = config
self.prefix = "FA2_"
def make(self, s): return (self.prefix + s)
def token_undefined(self): return self.make("TOKEN_UNDEFINED")
def insufficient_balance(self): return self.make("INSUFFICIENT_BALANCE")
def not_operator(self): return self.make("NOT_OPERATOR")
def not_owner(self): return self.make("NOT_OWNER")
def operators_unsupported(self): return self.make("OPERATORS_UNSUPPORTED")
def not_admin(self): return self.make("NOT_ADMIN")
def not_admin_or_operator(self): return self.make("NOT_ADMIN_OR_OPERATOR")
def paused(self): return self.make("PAUSED")
## The current type for a batched transfer in the specification is as
## follows:
##
## ```ocaml
## type transfer = {
## from_ : address;
## txs: {
## to_ : address;
## token_id : token_id;
## amount : nat;
## } list
## } list
## ```
##
## This class provides helpers to create and force the type of such elements.
## It uses the `FA2_config` to decide whether to set the right-comb layouts.
class Batch_transfer:
def __init__(self, config):
self.config = config
def get_transfer_type(self):
tx_type = sp.TRecord(to_ = sp.TAddress,
token_id = token_id_type,
amount = sp.TNat)
if self.config.force_layouts:
tx_type = tx_type.layout(
("to_", ("token_id", "amount"))
)
transfer_type = sp.TRecord(from_ = sp.TAddress,
txs = sp.TList(tx_type)).layout(
("from_", "txs"))
return transfer_type
def get_type(self):
return sp.TList(self.get_transfer_type())
def item(self, from_, txs):
v = sp.record(from_ = from_, txs = txs)
return sp.set_type_expr(v, self.get_transfer_type())
##
## `Operator_param` defines type types for the `%update_operators` entry-point.
class Operator_param:
def __init__(self, config):
self.config = config
def get_type(self):
t = sp.TRecord(
owner = sp.TAddress,
operator = sp.TAddress,
token_id = token_id_type)
if self.config.force_layouts:
t = t.layout(("owner", ("operator", "token_id")))
return t
def make(self, owner, operator, token_id):
r = sp.record(owner = owner,
operator = operator,
token_id = token_id)
return sp.set_type_expr(r, self.get_type())
## The class `Ledger_key` defines the key type for the main ledger (big-)map:
##
## - In *“Babylon mode”* we also have to call `sp.pack`.
## - In *“single-asset mode”* we can just use the user's address.
class Ledger_key:
def __init__(self, config):
self.config = config
def make(self, user, token):
user = sp.set_type_expr(user, sp.TAddress)
token = sp.set_type_expr(token, token_id_type)
if self.config.single_asset:
result = user
else:
result = sp.pair(user, token)
if self.config.readable:
return result
else:
return sp.pack(result)
## For now a value in the ledger is just the user's balance. Previous
## versions of the specification required more information; potential
## extensions may require other fields.
class Ledger_value:
def get_type():
return sp.TRecord(balance = sp.TNat)
def make(balance):
return sp.record(balance = balance)
## The link between operators and the addresses they operate is kept
## in a *lazy set* of `(owner × operator × token-id)` values.
##
## A lazy set is a big-map whose keys are the elements of the set and
## values are all `Unit`.
class Operator_set:
def __init__(self, config):
self.config = config
def inner_type(self):
return sp.TRecord(owner = sp.TAddress,
operator = sp.TAddress,
token_id = token_id_type
).layout(("owner", ("operator", "token_id")))
def key_type(self):
if self.config.readable:
return self.inner_type()
else:
return sp.TBytes
def make(self):
return self.config.my_map(tkey = self.key_type(), tvalue = sp.TUnit)
def make_key(self, owner, operator, token_id):
metakey = sp.record(owner = owner,
operator = operator,
token_id = token_id)
metakey = sp.set_type_expr(metakey, self.inner_type())
if self.config.readable:
return metakey
else:
return sp.pack(metakey)
def add(self, set, owner, operator, token_id):
set[self.make_key(owner, operator, token_id)] = sp.unit
def remove(self, set, owner, operator, token_id):
del set[self.make_key(owner, operator, token_id)]
def is_member(self, set, owner, operator, token_id):
return set.contains(self.make_key(owner, operator, token_id))
class Balance_of:
def request_type():
return sp.TRecord(
owner = sp.TAddress,
token_id = token_id_type).layout(("owner", "token_id"))
def response_type():
return sp.TList(
sp.TRecord(
request = Balance_of.request_type(),
balance = sp.TNat).layout(("request", "balance")))
def entry_point_type():
return sp.TRecord(
callback = sp.TContract(Balance_of.response_type()),
requests = sp.TList(Balance_of.request_type())
).layout(("requests", "callback"))
class Token_meta_data:
def __init__(self, config):
self.config = config
def get_type(self):
return sp.TRecord(token_id = sp.TNat, token_info = sp.TMap(sp.TString, sp.TBytes))
def set_type_and_layout(self, expr):
sp.set_type(expr, self.get_type())
## The set of all tokens is represented by a `nat` if we assume that token-ids
## are consecutive, or by an actual `(set nat)` if not.
##
## - Knowing the set of tokens is useful for throwing accurate error messages.
## - Previous versions of the specification required this set for functional
## behavior (operators interface had to deal with “all tokens”).
class Token_id_set:
def __init__(self, config):
self.config = config
def empty(self):
if self.config.assume_consecutive_token_ids:
# The "set" is its cardinal.
return sp.nat(0)
else:
return sp.set(t = token_id_type)
def add(self, totalTokens, tokenID):
if self.config.assume_consecutive_token_ids:
sp.verify(totalTokens == tokenID, message = "Token-IDs should be consecutive")
totalTokens.set(tokenID + 1)
else:
totalTokens.add(tokenID)
def contains(self, totalTokens, tokenID):
if self.config.assume_consecutive_token_ids:
return (tokenID < totalTokens)
else:
return totalTokens.contains(tokenID)
def cardinal(self, totalTokens):
if self.config.assume_consecutive_token_ids:
return totalTokens
else:
return sp.len(totalTokens)
##
## ## Implementation of the Contract
##
## `mutez_transfer` is an optional entry-point, hence we define it “outside” the
## class:
def mutez_transfer(contract, params):
sp.verify(sp.sender == contract.data.administrator)
sp.set_type(params.destination, sp.TAddress)
sp.set_type(params.amount, sp.TMutez)
sp.send(params.destination, params.amount)
##
## The `FA2` class builds a contract according to an `FA2_config` and an
## administrator address.
## It is inheriting from `FA2_core` which implements the strict
## standard and a few other classes to add other common features.
##
## - We see the use of
## [`sp.entry_point`](https://smartpy.io/docs/introduction/entry_points)
## as a function instead of using annotations in order to allow
## optional entry points.
## - The storage field `metadata_string` is a placeholder, the build
## system replaces the field annotation with a specific version-string, such
## as `"version_20200602_tzip_b916f32"`: the version of FA2-smartpy and
## the git commit in the TZIP [repository](https://gitlab.com/tzip/tzip) that
## the contract should obey.
class FA2_core(sp.Contract):
def __init__(self, config, metadata, **extra_storage):
self.config = config
self.error_message = Error_message(self.config)
self.operator_set = Operator_set(self.config)
self.operator_param = Operator_param(self.config)
self.token_id_set = Token_id_set(self.config)
self.ledger_key = Ledger_key(self.config)
self.token_meta_data = Token_meta_data(self.config)
self.batch_transfer = Batch_transfer(self.config)
if self.config.add_mutez_transfer:
self.transfer_mutez = sp.entry_point(mutez_transfer)
if config.lazy_entry_points:
self.add_flag("lazy-entry-points")
self.add_flag("initial-cast")
self.exception_optimization_level = "default-line"
self.init(
ledger = self.config.my_map(tvalue = Ledger_value.get_type()),
token_metadata = self.config.my_map(tkey = sp.TNat, tvalue = self.token_meta_data.get_type()),
token_royalties = self.config.my_map(tkey = sp.TNat, tvalue = sp.TRecord(
amount = sp.TNat,
beneficiary = sp.TAddress
)),
operators = self.operator_set.make(),
all_tokens = self.token_id_set.empty(),
metadata = metadata,
**extra_storage
)
if self.config.store_total_supply:
self.update_initial_storage(
total_supply = self.config.my_map(tkey = sp.TNat, tvalue = sp.TNat),
)
@sp.entry_point
def transfer(self, params):
sp.verify( ~self.is_paused(), message = self.error_message.paused() )
sp.set_type(params, self.batch_transfer.get_type())
sp.for transfer in params:
current_from = transfer.from_
sp.for tx in transfer.txs:
if self.config.single_asset:
sp.verify(tx.token_id == 0, message = "single-asset: token-id <> 0")
sender_verify = ((self.is_administrator(sp.sender)) |
(current_from == sp.sender))
message = self.error_message.not_owner()
if self.config.support_operator:
message = self.error_message.not_operator()
sender_verify |= (self.operator_set.is_member(self.data.operators,
current_from,
sp.sender,
tx.token_id))
if self.config.allow_self_transfer:
sender_verify |= (sp.sender == sp.self_address)
sp.verify(sender_verify, message = message)
sp.verify(
self.data.token_metadata.contains(tx.token_id),
message = self.error_message.token_undefined()
)
# If amount is 0 we do nothing now:
sp.if (tx.amount > 0):
from_user = self.ledger_key.make(current_from, tx.token_id)
sp.verify(
(self.data.ledger[from_user].balance >= tx.amount),
message = self.error_message.insufficient_balance())
to_user = self.ledger_key.make(tx.to_, tx.token_id)
self.data.ledger[from_user].balance = sp.as_nat(
self.data.ledger[from_user].balance - tx.amount)
sp.if self.data.ledger.contains(to_user):
self.data.ledger[to_user].balance += tx.amount
sp.else:
self.data.ledger[to_user] = Ledger_value.make(tx.amount)
sp.else:
pass
@sp.entry_point
def balance_of(self, params):
# paused may mean that balances are meaningless:
sp.verify( ~self.is_paused(), message = self.error_message.paused())
sp.set_type(params, Balance_of.entry_point_type())
def f_process_request(req):
user = self.ledger_key.make(req.owner, req.token_id)
sp.verify(self.data.token_metadata.contains(req.token_id), message = self.error_message.token_undefined())
sp.if self.data.ledger.contains(user):
balance = self.data.ledger[user].balance
sp.result(
sp.record(
request = sp.record(
owner = sp.set_type_expr(req.owner, sp.TAddress),
token_id = sp.set_type_expr(req.token_id, sp.TNat)),
balance = balance))
sp.else:
sp.result(
sp.record(
request = sp.record(
owner = sp.set_type_expr(req.owner, sp.TAddress),
token_id = sp.set_type_expr(req.token_id, sp.TNat)),
balance = 0))
res = sp.local("responses", params.requests.map(f_process_request))
destination = sp.set_type_expr(params.callback, sp.TContract(Balance_of.response_type()))
sp.transfer(res.value, sp.mutez(0), destination)
@sp.offchain_view(pure = True)
def get_balance(self, req):
"""This is the `get_balance` view defined in TZIP-12."""
sp.set_type(
req, sp.TRecord(
owner = sp.TAddress,
token_id = sp.TNat
).layout(("owner", "token_id")))
user = self.ledger_key.make(req.owner, req.token_id)
sp.verify(self.data.token_metadata.contains(req.token_id), message = self.error_message.token_undefined())
sp.result(self.data.ledger[user].balance)
@sp.entry_point
def update_operators(self, params):
sp.set_type(params, sp.TList(
sp.TVariant(
add_operator = self.operator_param.get_type(),
remove_operator = self.operator_param.get_type()
)
))
if self.config.support_operator:
sp.for update in params:
with update.match_cases() as arg:
with arg.match("add_operator") as upd:
sp.verify(
(upd.owner == sp.sender) | self.is_administrator(sp.sender),
message = self.error_message.not_admin_or_operator()
)
self.operator_set.add(self.data.operators,
upd.owner,
upd.operator,
upd.token_id)
with arg.match("remove_operator") as upd:
sp.verify(
(upd.owner == sp.sender) | self.is_administrator(sp.sender),
message = self.error_message.not_admin_or_operator()
)
self.operator_set.remove(self.data.operators,
upd.owner,
upd.operator,
upd.token_id)
else:
sp.failwith(self.error_message.operators_unsupported())
# this is not part of the standard but can be supported through inheritance.
def is_paused(self):
return sp.bool(False)
# this is not part of the standard but can be supported through inheritance.
def is_administrator(self, sender):
return sp.bool(False)
class FA2_administrator(FA2_core):
def is_administrator(self, sender):
return sender == self.data.administrator
@sp.entry_point
def set_administrator(self, params):
sp.verify(self.is_administrator(sp.sender), message = self.error_message.not_admin())
self.data.administrator = params
class FA2_pause(FA2_core):
def is_paused(self):
return self.data.paused
@sp.entry_point
def set_pause(self, params):
sp.verify(self.is_administrator(sp.sender), message = self.error_message.not_admin())
self.data.paused = params
class FA2_change_metadata(FA2_core):
@sp.entry_point
def set_metadata(self, k, v):
sp.verify(self.is_administrator(sp.sender), message = self.error_message.not_admin())
self.data.metadata[k] = v
class FA2_mint(FA2_core):
@sp.entry_point
def mint(self, params):
sp.verify(self.is_administrator(sp.sender), message = self.error_message.not_admin())
# We don't check for pauseness because we're the admin.
if self.config.single_asset:
sp.verify(params.token_id == 0, message = "single-asset: token-id <> 0")
if self.config.non_fungible:
sp.verify(params.amount == 1, message = "NFT-asset: amount <> 1")
sp.verify(
~ self.token_id_set.contains(self.data.all_tokens, params.token_id),
message = "NFT-asset: cannot mint twice same token"
)
user = self.ledger_key.make(params.address, params.token_id)
sp.if self.data.ledger.contains(user):
self.data.ledger[user].balance += params.amount
sp.else:
self.data.ledger[user] = Ledger_value.make(params.amount)
sp.if ~ self.token_id_set.contains(self.data.all_tokens, params.token_id):
self.token_id_set.add(self.data.all_tokens, params.token_id)
self.data.token_metadata[params.token_id] = sp.record(
token_id = params.token_id,
token_info = params.metadata
)
self.data.token_royalties[params.token_id] = sp.record(
amount = params.royalty_amount,
beneficiary = params.royalty_beneficiary
)
if self.config.store_total_supply:
self.data.total_supply[params.token_id] = params.amount + self.data.total_supply.get(params.token_id, default_value = 0)
@sp.onchain_view(name="getRoyalties")
def getRoyalties(self, token_id):
# Define the input parameter data type
sp.set_type(token_id, sp.TNat)
# Check that the token id is present in the royalties big map
sp.verify(self.data.token_royalties.contains(token_id), "The provided token_id doesn't exist")
# Return the royalty record
sp.result(self.data.token_royalties[token_id])
class FA2_token_metadata(FA2_core):
def set_token_metadata_view(self):
def token_metadata(self, tok):
"""
Return the token-metadata URI for the given token.
For a reference implementation, dynamic-views seem to be the
most flexible choice.
"""
sp.set_type(tok, sp.TNat)
sp.result(self.data.token_metadata[tok])
self.token_metadata = sp.offchain_view(pure = True, doc = "Get Token Metadata")(token_metadata)
def make_metadata(symbol, name, decimals):
"Helper function to build metadata JSON bytes values."
return (sp.map(l = {
# Remember that michelson wants map already in ordered
"decimals" : sp.utils.bytes_of_string("%d" % decimals),
"name" : sp.utils.bytes_of_string(name),
"symbol" : sp.utils.bytes_of_string(symbol)
}))
class FA2(FA2_change_metadata, FA2_token_metadata, FA2_mint, FA2_administrator, FA2_pause, FA2_core):
@sp.offchain_view(pure = True)
def count_tokens(self):
"""Get how many tokens are in this FA2 contract.
"""
sp.result(self.token_id_set.cardinal(self.data.all_tokens))
@sp.offchain_view(pure = True)
def does_token_exist(self, tok):
"Ask whether a token ID is exists."
sp.set_type(tok, sp.TNat)
sp.result(self.data.token_metadata.contains(tok))
@sp.offchain_view(pure = True)
def all_tokens(self):
if self.config.assume_consecutive_token_ids:
sp.result(sp.range(0, self.data.all_tokens))
else:
sp.result(self.data.all_tokens.elements())
@sp.offchain_view(pure = True)
def total_supply(self, tok):
if self.config.store_total_supply:
sp.result(self.data.total_supply[tok])
else:
sp.set_type(tok, sp.TNat)
sp.result("total-supply not supported")
@sp.offchain_view(pure = True)
def is_operator(self, query):
sp.set_type(query,
sp.TRecord(token_id = sp.TNat,
owner = sp.TAddress,
operator = sp.TAddress).layout(
("owner", ("operator", "token_id"))))
sp.result(
self.operator_set.is_member(self.data.operators,
query.owner,
query.operator,
query.token_id)
)
def __init__(self, config, metadata, admin):
# Let's show off some meta-programming:
if config.assume_consecutive_token_ids:
self.all_tokens.doc = """
This view is specified (but optional) in the standard.
This contract is built with assume_consecutive_token_ids =
True, so we return a list constructed from the number of tokens.
"""
else:
self.all_tokens.doc = """
This view is specified (but optional) in the standard.
This contract is built with assume_consecutive_token_ids =
False, so we convert the set of tokens from the storage to a list
to fit the expected type of TZIP-16.
"""
list_of_views = [
self.get_balance
, self.does_token_exist
, self.count_tokens
, self.all_tokens
, self.is_operator
]
if config.store_total_supply:
list_of_views = list_of_views + [self.total_supply]
if config.use_token_metadata_offchain_view:
self.set_token_metadata_view()
list_of_views = list_of_views + [self.token_metadata]
metadata_base = {
"version": config.name # will be changed if using fatoo.
, "description" : (
"This is a didactic reference implementation of FA2,"
+ " a.k.a. TZIP-012, using SmartPy.\n\n"
+ "This particular contract uses the configuration named: "
+ config.name + "."
)
, "interfaces": ["TZIP-012", "TZIP-016"]
, "authors": [
"Seb Mondet <https://seb.mondet.org>"
]
, "homepage": "https://gitlab.com/smondet/fa2-smartpy"
, "views": list_of_views
, "source": {
"tools": ["SmartPy"]
, "location": "https://gitlab.com/smondet/fa2-smartpy.git"
}
, "permissions": {
"operator":
"owner-or-operator-transfer" if config.support_operator else "owner-transfer"
, "receiver": "owner-no-hook"
, "sender": "owner-no-hook"
}
, "fa2-smartpy": {
"configuration" :
dict([(k, getattr(config, k)) for k in dir(config) if "__" not in k and k != 'my_map'])
}
}
self.init_metadata("metadata_base", metadata_base)
FA2_core.__init__(self, config, metadata, paused = False, administrator = admin)
## ## Tests
##
## ### Auxiliary Consumer Contract
##
## This contract is used by the tests to be on the receiver side of
## callback-based entry-points.
## It stores facts about the results in order to use `scenario.verify(...)`
## (cf.
## [documentation](https://smartpy.io/docs/scenarios/testing)).
class View_consumer(sp.Contract):
def __init__(self, contract):
self.contract = contract
self.init(last_sum = 0,
operator_support = not contract.config.support_operator)
@sp.entry_point
def reinit(self):
self.data.last_sum = 0
# It's also nice to make this contract have more than one entry point.
@sp.entry_point
def receive_balances(self, params):
sp.set_type(params, Balance_of.response_type())
self.data.last_sum = 0
sp.for resp in params:
self.data.last_sum += resp.balance
##
## ## Global Environment Parameters
##
## The build system communicates with the python script through
## environment variables.
## The function `environment_config` creates an `FA2_config` given the
## presence and values of a few environment variables.
def global_parameter(env_var, default):
try:
if os.environ[env_var] == "true" :
return True
if os.environ[env_var] == "false" :
return False
return default
except:
return default
def environment_config():
return FA2_config(
debug_mode = global_parameter("debug_mode", False),
single_asset = global_parameter("single_asset", False),
non_fungible = global_parameter("non_fungible", False),
add_mutez_transfer = global_parameter("add_mutez_transfer", False),
readable = global_parameter("readable", True),
force_layouts = global_parameter("force_layouts", True),
support_operator = global_parameter("support_operator", True),
assume_consecutive_token_ids =
global_parameter("assume_consecutive_token_ids", True),
store_total_supply = global_parameter("store_total_supply", False),
lazy_entry_points = global_parameter("lazy_entry_points", False),
allow_self_transfer = global_parameter("allow_self_transfer", False),
use_token_metadata_offchain_view = global_parameter("use_token_metadata_offchain_view", True),
)
class MRKT (sp.Contract):
LISTING_TYPE = sp.TRecord(
# address that created listing
issuer=sp.TAddress,
# token FA2 contract address
fa2=sp.TAddress,
# token ID being listed
token_id=sp.TNat,
# number of editions being listed at this price
token_amount=sp.TNat,
# the price of each edition in mutez
xtz_per_token=sp.TMutez).layout(
("issuer",
("fa2",
("token_id",
("token_amount", "xtz_per_token")
)
)
)
)
def __init__(self, metadata):
# storage types
self.init_type(sp.TRecord(
metadata=sp.TBigMap(sp.TString, sp.TBytes),
listings=sp.TBigMap(sp.TNat, MRKT.LISTING_TYPE),
counter=sp.TNat
))
# initialize storage
self.init(
metadata=metadata,
listings=sp.big_map(),
counter=0
)
def check_no_tez_transfer(self):
"""Checks that no tez were transferred in the operation."""
sp.verify(sp.amount == sp.tez(0), "The operation does not need tez transfers")
@sp.onchain_view()
def getRoyalties(self, listing_id):
"""Check if a given listing id is present in the listings big map."""
# Define the input parameter data type
sp.set_type(listing_id, sp.TNat)
# Check that the listing id is present in the listings big map
sp.verify(self.data.listings.contains(listing_id), "The provided listing_id doesn't exist")
# get listing
listing = self.data.listings[listing_id]
rec = sp.view("getRoyalties", listing.fa2, listing.token_id, t = sp.TRecord(
amount = sp.TNat,
beneficiary = sp.TAddress
)).open_some("invalid getRoyalties view")
# Return the royalty information
sp.result(rec)
@sp.entry_point
def collect(self, listing_id):
"""Collects one edition of a token that has already been listed."""
# Define the input parameter data type
sp.set_type(listing_id, sp.TNat)
# Check that the listing id is present in the listings big map
sp.verify(self.data.listings.contains(listing_id), "The provided listing_id doesn't exist")
# Check that the collector is not the creator of the listing
listing = self.data.listings[listing_id]
sp.verify(sp.sender != listing.issuer, "The collector cannot be the listing issuer")
# Check that the provided tez amount is exactly the edition price
sp.verify(sp.amount == listing.xtz_per_token, "The sent tez amount does not coincide with the edition price")
# Check that there is at least one edition available to collect
sp.verify(listing.token_amount > 0, "All editions have already been collected")
# Handle tez tranfers if the edition price is not zero
sp.if listing.xtz_per_token != sp.tez(0):
# get royalty info from token contract
royalties = sp.view("getRoyalties", listing.fa2, listing.token_id, t = sp.TRecord(
amount = sp.TNat,
beneficiary = sp.TAddress
)).open_some("invalid getRoyalties view")
# Send the royalties to the NFT creator
royalties_amount = sp.local(
"royalties_amount", sp.split_tokens(listing.xtz_per_token, royalties.amount, 1000)
)
# send royalties
sp.if royalties_amount.value > sp.mutez(0):
sp.send(royalties.beneficiary, royalties_amount.value)
# Send what is left to the listing issuer
sp.send(listing.issuer, sp.amount - royalties_amount.value)
# Transfer the token edition to the collector
self.fa2_transfer(
fa2=listing.fa2,
from_=sp.self_address,
to_=sp.sender,
token_id=listing.token_id,
token_amount=1
)
# Update the number of editions available in the listings big map
listing.token_amount = sp.as_nat(listing.token_amount - 1)
@sp.entry_point
def create_listing(self, params):
"""Lists several editions of a token at a fixed price."""
# Define the input parameter data type
sp.set_type(params, sp.TRecord(
# token FA2 contract address
fa2=sp.TAddress,
# token ID being listed
token_id=sp.TNat,
# number of editions being listed at this price
token_amount=sp.TNat,
# the price of each edition in mutez
xtz_per_token=sp.TMutez).layout(
("fa2",
("token_id",
("token_amount", "xtz_per_token")
)
)
)
)
# Check that no tez have been transferred
self.check_no_tez_transfer()
# Check that at least one edition will be listed
sp.verify(params.token_amount > 0, "At least one edition needs to be listed")
# Transfer all the editions to the marketplace account
self.fa2_transfer(
fa2=params.fa2,
from_=sp.sender,
to_=sp.self_address,
token_id=params.token_id,
token_amount=params.token_amount)
# Update the listings bigmap with the new listing information
self.data.listings[self.data.counter] = sp.record(
issuer=sp.sender,
fa2=params.fa2,
token_id=params.token_id,
token_amount=params.token_amount,
xtz_per_token=params.xtz_per_token
)
# Increase the listing counter
self.data.counter += 1
@sp.entry_point
def cancel_listing(self, listing_id):
"""Cancels an existing listing."""
# Define the input parameter data type
sp.set_type(listing_id, sp.TNat)
# Check that no tez have been transferred
self.check_no_tez_transfer()
# Check that the listing id is present in the listings big map
sp.verify(self.data.listings.contains(listing_id), "The provided listing_id doesn't exist")
# Check that the listing issuer is cancelling the listing
listing = self.data.listings[listing_id]
sp.verify(sp.sender == listing.issuer, "Only the listing issuer can cancel the listing")
# Check that there is at least one listed edition
sp.verify(listing.token_amount > 0, "All editions have been collected")
# Transfer the remaining token editions back to the issuer
self.fa2_transfer(
fa2=listing.fa2,
from_=sp.self_address,
to_=sp.sender,
token_id=listing.token_id,
token_amount=listing.token_amount)
# Delete the listing
del self.data.listings[listing_id]
def fa2_transfer(self, fa2, from_, to_, token_id, token_amount):
"""Transfers a number of editions of a FA2 token between two addresses."""
# Get a handle to the FA2 token transfer entry point
c = sp.contract(
t=sp.TList(sp.TRecord(
from_=sp.TAddress,
txs=sp.TList(sp.TRecord(
to_=sp.TAddress,
token_id=sp.TNat,
amount=sp.TNat).layout(("to_", ("token_id", "amount")))))),
address=fa2,
entry_point="transfer").open_some()
# Transfer the FA2 token editions to the new address
sp.transfer(
arg=sp.list([sp.record(
from_=from_,
txs=sp.list([sp.record(
to_=to_,
token_id=token_id,
amount=token_amount)]))]),
amount=sp.mutez(0),
destination=c)
@sp.onchain_view()
def has_listing(self, listing_id):
"""Check if a given listing id is present in the listings big map."""
# Define the input parameter data type
sp.set_type(listing_id, sp.TNat)
# Return True if the listing id is present in the listings big map
sp.result(self.data.listings.contains(listing_id))
@sp.onchain_view()
def get_listing(self, listing_id):
"""Returns the complete information from a given listing id."""
# Define the input parameter data type
sp.set_type(listing_id, sp.TNat)
# Check that the listing id is present in the listings big map
sp.verify(self.data.listings.contains(listing_id), "The provided listing_id doesn't exist")
# Return the listing information
sp.result(self.data.listings[listing_id])
@sp.onchain_view()
def get_listings_counter(self):
"""Returns the listings counter."""
sp.result(self.data.counter)
if "templates" not in __name__:
@sp.add_test(name = "SpectrumColors")
def test():
artist = sp.test_account("artist")
collector1 = sp.test_account("collector1")
collector2 = sp.test_account("collector2")
market = MRKT(sp.utils.metadata_of_url("ipfs://aaa"))
scenario = sp.test_scenario()
scenario.h1("MRKT")
scenario += market
admin = sp.test_account("Administrator")
alice = sp.test_account("Alice")
bob = sp.test_account("Robert")
# Let's display the accounts:
scenario.h2("Accounts")
scenario.show([admin, alice, bob])
fa2 = FA2(
config = environment_config(),
metadata = sp.utils.metadata_of_url("https://example.com"),
admin = admin.address)
scenario += fa2
tok0_md = FA2.make_metadata(
name = "The Token Zero",
decimals = 2,
symbol= "TK0" )
token_id = 0
scenario += fa2.mint(address = alice.address,
amount = 50,
metadata = tok0_md,
token_id = token_id,
royalty_beneficiary = alice.address,
royalty_amount = 250).run(sender = admin)
scenario += fa2.update_operators([
sp.variant("add_operator", sp.record(
owner=alice.address,
operator=market.address,
token_id=token_id))]).run(sender=alice)
edition_count = 50
edition_price = 1
scenario += market.create_listing(
fa2 = fa2.address,
token_id = 0,
token_amount = edition_count,
xtz_per_token = sp.tez(edition_price)
).run(sender = alice.address)
listing_id = sp.as_nat(market.get_listings_counter() - 1)
scenario += market.collect(
listing_id
).run(sender = bob.address, amount = sp.tez(edition_price))
scenario.p("Counter").show(market.get_listing(listing_id))
sp.add_compilation_target("MRKT", MRKT(sp.utils.metadata_of_url("ipfs://aaa")))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment