import copy
import dataclasses
import warnings
from base64 import b64decode
from collections.abc import Callable, Sequence
from pathlib import Path
from typing import Any, cast
import algosdk
from algosdk import abi, transaction
from algosdk.account import address_from_private_key
from algosdk.atomic_transaction_composer import (
ABI_RETURN_HASH,
ABIResult,
AccountTransactionSigner,
AtomicTransactionComposer,
AtomicTransactionResponse,
LogicSigTransactionSigner,
MultisigTransactionSigner,
TransactionSigner,
TransactionWithSigner,
)
from algosdk.logic import get_application_address
from algosdk.v2client.algod import AlgodClient
from pyteal import ABIReturnSubroutine, CallConfig, MethodConfig
from beaker.application import Application
from beaker.application_specification import (
ApplicationSpecification,
DefaultArgumentDict,
MethodHints,
)
from beaker.client.logic_error import LogicException, parse_logic_error
from beaker.client.state_decode import decode_state
from beaker.compilation import Program
from beaker.consts import num_extra_program_pages
[docs]class ApplicationClient:
[docs] def __init__(
self,
client: AlgodClient,
app: ApplicationSpecification | str | Path | Application,
app_id: int = 0,
signer: TransactionSigner | None = None,
sender: str | None = None,
suggested_params: transaction.SuggestedParams | None = None,
):
self.client = client
self.app: ApplicationSpecification
match app:
case ApplicationSpecification() as compiled_app:
self.app = compiled_app
case Application() as app:
self.app = app.build(client)
case Path() as path:
if path.is_dir():
path = path / "application.json"
self.app = ApplicationSpecification.from_json(
path.read_text(encoding="utf8")
)
case str():
self.app = ApplicationSpecification.from_json(app)
self.app_id = app_id
self.app_addr = get_application_address(app_id) if self.app_id != 0 else None
self.signer = signer
self.sender = sender
if signer is not None and sender is None:
self.sender = self.get_sender(sender, self.signer)
self.approval = Program(self.app.approval_program, self.client)
self.clear = Program(self.app.clear_program, self.client)
self.suggested_params = suggested_params
def find_method(predicate: Callable[[MethodConfig], bool]) -> abi.Method | None:
matching = [
method
for method in self.app.contract.methods
if predicate(self._method_hints(method).call_config)
]
if len(matching) == 1:
return matching[0]
elif len(matching) > 1:
# TODO: warn?
pass
return None
self.on_create = find_method(
lambda x: any([x & CallConfig.CREATE for x in dataclasses.astuple(x)])
)
self.on_update = find_method(lambda x: x.update_application != CallConfig.NEVER)
self.on_opt_in = find_method(lambda x: x.opt_in != CallConfig.NEVER)
self.on_close_out = find_method(lambda x: x.close_out != CallConfig.NEVER)
self.on_delete = find_method(lambda x: x.delete_application != CallConfig.NEVER)
[docs] def create(
self,
sender: str | None = None,
signer: TransactionSigner | None = None,
suggested_params: transaction.SuggestedParams | None = None,
on_complete: transaction.OnComplete = transaction.OnComplete.NoOpOC,
extra_pages: int | None = None,
**kwargs: Any, # noqa: ANN401
) -> tuple[int, str, str]:
"""Submits a signed ApplicationCallTransaction with application id == 0 and the schema and source from the Application passed"""
if extra_pages is None:
extra_pages = num_extra_program_pages(
self.approval.raw_binary, self.clear.raw_binary
)
sp = self.get_suggested_params(suggested_params)
signer = self.get_signer(signer)
sender = self.get_sender(sender, signer)
atc = AtomicTransactionComposer()
if self.on_create is not None:
self.add_method_call(
atc,
self.on_create,
sender=sender,
suggested_params=sp,
on_complete=on_complete,
approval_program=self.approval.raw_binary,
clear_program=self.clear.raw_binary,
global_schema=self.app.global_state_schema,
local_schema=self.app.local_state_schema,
extra_pages=extra_pages,
**kwargs,
)
else:
atc.add_transaction(
TransactionWithSigner(
txn=transaction.ApplicationCreateTxn(
sender=sender,
sp=sp,
on_complete=on_complete,
approval_program=self.approval.raw_binary,
clear_program=self.clear.raw_binary,
global_schema=self.app.global_state_schema,
local_schema=self.app.local_state_schema,
extra_pages=extra_pages,
**kwargs,
),
signer=signer,
)
)
create_result = self._execute_atc(atc)
create_txid = create_result.tx_ids[0]
result = self.client.pending_transaction_info(create_txid)
app_id = result["application-index"]
app_addr = get_application_address(app_id)
self.app_id = app_id
self.app_addr = app_addr
return app_id, app_addr, create_txid
[docs] def update(
self,
sender: str | None = None,
signer: TransactionSigner | None = None,
suggested_params: transaction.SuggestedParams | None = None,
**kwargs: Any, # noqa: ANN401
) -> str:
"""Submits a signed ApplicationCallTransaction with OnComplete set to UpdateApplication and source from the Application passed"""
sp = self.get_suggested_params(suggested_params)
signer = self.get_signer(signer)
sender = self.get_sender(sender, signer)
atc = AtomicTransactionComposer()
if self.on_update is not None:
self.add_method_call(
atc,
self.on_update,
on_complete=transaction.OnComplete.UpdateApplicationOC,
sender=sender,
suggested_params=sp,
approval_program=self.approval.raw_binary,
clear_program=self.clear.raw_binary,
**kwargs,
)
else:
atc.add_transaction(
TransactionWithSigner(
txn=transaction.ApplicationUpdateTxn(
sender=sender,
sp=sp,
index=self.app_id,
approval_program=self.approval.raw_binary,
clear_program=self.clear.raw_binary,
**kwargs,
),
signer=signer,
)
)
update_result = self._execute_atc(atc)
return update_result.tx_ids[0]
[docs] def opt_in(
self,
sender: str | None = None,
signer: TransactionSigner | None = None,
suggested_params: transaction.SuggestedParams | None = None,
**kwargs: Any, # noqa: ANN401
) -> str:
"""Submits a signed ApplicationCallTransaction with OnComplete set to OptIn"""
sp = self.get_suggested_params(suggested_params)
signer = self.get_signer(signer)
sender = self.get_sender(sender, signer)
atc = AtomicTransactionComposer()
if self.on_opt_in is not None:
self.add_method_call(
atc,
self.on_opt_in,
on_complete=transaction.OnComplete.OptInOC,
sender=sender,
suggested_params=sp,
signer=signer,
**kwargs,
)
else:
atc.add_transaction(
TransactionWithSigner(
txn=transaction.ApplicationOptInTxn(
sender=sender,
sp=sp,
index=self.app_id,
**kwargs,
),
signer=signer,
)
)
opt_in_result = self._execute_atc(atc)
return opt_in_result.tx_ids[0]
[docs] def close_out(
self,
sender: str | None = None,
signer: TransactionSigner | None = None,
suggested_params: transaction.SuggestedParams | None = None,
**kwargs: Any, # noqa: ANN401
) -> str:
"""Submits a signed ApplicationCallTransaction with OnComplete set to CloseOut"""
sp = self.get_suggested_params(suggested_params)
signer = self.get_signer(signer)
sender = self.get_sender(sender, signer)
atc = AtomicTransactionComposer()
if self.on_close_out is not None:
self.add_method_call(
atc,
self.on_close_out,
on_complete=transaction.OnComplete.CloseOutOC,
sender=sender,
suggested_params=sp,
signer=signer,
**kwargs,
)
else:
atc.add_transaction(
TransactionWithSigner(
txn=transaction.ApplicationCloseOutTxn(
sender=sender,
sp=sp,
index=self.app_id,
**kwargs,
),
signer=signer,
)
)
close_out_result = self._execute_atc(atc)
return close_out_result.tx_ids[0]
[docs] def clear_state(
self,
sender: str | None = None,
signer: TransactionSigner | None = None,
suggested_params: transaction.SuggestedParams | None = None,
**kwargs: Any, # noqa: ANN401
) -> str:
"""Submits a signed ApplicationCallTransaction with OnComplete set to ClearState"""
sp = self.get_suggested_params(suggested_params)
signer = self.get_signer(signer)
sender = self.get_sender(sender, signer)
atc = AtomicTransactionComposer()
atc.add_transaction(
TransactionWithSigner(
txn=transaction.ApplicationClearStateTxn(
sender=sender,
sp=sp,
index=self.app_id,
**kwargs,
),
signer=signer,
)
)
clear_state_result = atc.execute(self.client, 4)
return clear_state_result.tx_ids[0]
[docs] def delete(
self,
sender: str | None = None,
signer: TransactionSigner | None = None,
suggested_params: transaction.SuggestedParams | None = None,
**kwargs: Any, # noqa: ANN401
) -> str:
"""Submits a signed ApplicationCallTransaction with OnComplete set to DeleteApplication"""
sp = self.get_suggested_params(suggested_params)
signer = self.get_signer(signer)
sender = self.get_sender(sender, signer)
atc = AtomicTransactionComposer()
if self.on_delete:
self.add_method_call(
atc,
self.on_delete,
on_complete=transaction.OnComplete.DeleteApplicationOC,
sender=sender,
suggested_params=sp,
signer=signer,
**kwargs,
)
else:
atc.add_transaction(
TransactionWithSigner(
txn=transaction.ApplicationDeleteTxn(
sender=sender,
sp=sp,
index=self.app_id,
**kwargs,
),
signer=signer,
)
)
delete_result = self._execute_atc(atc)
return delete_result.tx_ids[0]
[docs] def prepare(
self,
signer: TransactionSigner | None = None,
sender: str | None = None,
**kwargs: Any, # noqa: ANN401
) -> "ApplicationClient":
"""makes a copy of the current ApplicationClient and the fields passed"""
ac = copy.copy(self)
ac.signer = ac.get_signer(signer)
ac.sender = ac.get_sender(sender, ac.signer)
ac.__dict__.update(**kwargs)
return ac
[docs] def call(
self,
method: abi.Method | ABIReturnSubroutine | str,
sender: str | None = None,
signer: TransactionSigner | None = None,
suggested_params: transaction.SuggestedParams | None = None,
on_complete: transaction.OnComplete = transaction.OnComplete.NoOpOC,
local_schema: transaction.StateSchema | None = None,
global_schema: transaction.StateSchema | None = None,
approval_program: bytes | None = None,
clear_program: bytes | None = None,
extra_pages: int | None = None,
accounts: list[str] | None = None,
foreign_apps: list[int] | None = None,
foreign_assets: list[int] | None = None,
boxes: Sequence[tuple[int, bytes | bytearray | str | int]] | None = None,
note: bytes | None = None,
lease: bytes | None = None,
rekey_to: str | None = None,
atc: AtomicTransactionComposer | None = None,
**kwargs: Any, # noqa: ANN401
) -> ABIResult:
"""Handles calling the application"""
method = self._resolve_abi_method(method)
hints = self._method_hints(method)
if atc is None:
atc = AtomicTransactionComposer()
atc = self.add_method_call(
atc,
method,
sender,
signer,
suggested_params=suggested_params,
on_complete=on_complete,
local_schema=local_schema,
global_schema=global_schema,
approval_program=approval_program,
clear_program=clear_program,
extra_pages=extra_pages,
accounts=accounts,
foreign_apps=foreign_apps,
foreign_assets=foreign_assets,
note=note,
lease=lease,
rekey_to=rekey_to,
boxes=boxes,
**kwargs,
)
if atc is None:
raise Exception("ATC none?")
# If its a read-only method, use dryrun (TODO: swap with simulate later?)
if hints.read_only:
dr_req = transaction.create_dryrun(self.client, atc.gather_signatures())
dr_result = self.client.dryrun(dr_req)
for txn in dr_result["txns"]:
if "app-call-messages" in txn:
if "REJECT" in txn["app-call-messages"]:
msg = ", ".join(txn["app-call-messages"])
raise Exception(f"Dryrun for readonly method failed: {msg}")
method_results = self._parse_result(
{0: method}, dr_result["txns"], atc.tx_ids
)
return method_results.pop()
result = self._execute_atc(atc)
return result.abi_results.pop()
# TEMPORARY, use SDK one when available
def _parse_result(
self,
methods: dict[int, abi.Method],
txns: list[dict[str, Any]],
txids: list[str],
) -> list[ABIResult]:
method_results = []
for i, tx_info in enumerate(txns):
raw_value = b""
return_value = None
decode_error = None
if i not in methods:
continue
# Parse log for ABI method return value
try:
if methods[i].returns.type == abi.Returns.VOID:
method_results.append(
ABIResult(
tx_id=txids[i],
raw_value=raw_value,
return_value=return_value,
decode_error=decode_error,
tx_info=tx_info,
method=methods[i],
)
)
continue
logs = tx_info["logs"] if "logs" in tx_info else []
# Look for the last returned value in the log
if not logs:
raise Exception("No logs")
result = logs[-1]
# Check that the first four bytes is the hash of "return"
result_bytes = b64decode(result)
if len(result_bytes) < 4 or result_bytes[:4] != ABI_RETURN_HASH:
raise Exception("no logs")
raw_value = result_bytes[4:]
abi_return_type = methods[i].returns.type
if isinstance(abi_return_type, abi.ABIType):
return_value = abi_return_type.decode(raw_value)
else:
return_value = raw_value
except Exception as e:
decode_error = e
method_results.append(
ABIResult(
tx_id=txids[i],
raw_value=raw_value,
return_value=return_value,
decode_error=decode_error,
tx_info=tx_info,
method=methods[i],
)
)
return method_results
[docs] def add_method_call(
self,
atc: AtomicTransactionComposer,
method: abi.Method | ABIReturnSubroutine | str,
sender: str | None = None,
signer: TransactionSigner | None = None,
suggested_params: transaction.SuggestedParams | None = None,
on_complete: transaction.OnComplete = transaction.OnComplete.NoOpOC,
local_schema: transaction.StateSchema | None = None,
global_schema: transaction.StateSchema | None = None,
approval_program: bytes | None = None,
clear_program: bytes | None = None,
extra_pages: int | None = None,
accounts: list[str] | None = None,
foreign_apps: list[int] | None = None,
foreign_assets: list[int] | None = None,
boxes: Sequence[tuple[int, bytes | bytearray | str | int]] | None = None,
note: bytes | None = None,
lease: bytes | None = None,
rekey_to: str | None = None,
**kwargs: Any, # noqa: ANN401
) -> AtomicTransactionComposer:
"""Adds a transaction to the AtomicTransactionComposer passed"""
sp = self.get_suggested_params(suggested_params)
signer = self.get_signer(signer)
sender = self.get_sender(sender, signer)
method = self._resolve_abi_method(method)
hints = self._method_hints(method)
args = []
for method_arg in method.args:
name = method_arg.name
if name in kwargs:
argument = kwargs.pop(name)
if isinstance(argument, dict):
if hints.structs is None or name not in hints.structs:
raise Exception(
f"Argument missing struct hint: {name}. Check argument name and type"
)
elems = hints.structs[name]["elements"]
argument = [
argument[field_name] for field_name, field_type in elems
]
args.append(argument)
elif (
hints.default_arguments is not None and name in hints.default_arguments
):
default_arg = hints.default_arguments[name]
if default_arg is not None:
args.append(self.resolve(default_arg))
else:
raise Exception(f"Unspecified argument: {name}")
if kwargs:
warnings.warn(f"Unused arguments specified: {', '.join(kwargs)}")
if boxes is not None:
# TODO: algosdk actually does this, but it's type hints say otherwise...
encoded_boxes = [
(id_, algosdk.encoding.encode_as_bytes(name)) for id_, name in boxes
]
else:
encoded_boxes = None
atc.add_method_call(
self.app_id,
method,
sender,
sp,
signer,
method_args=args,
on_complete=on_complete,
local_schema=local_schema,
global_schema=global_schema,
approval_program=approval_program,
clear_program=clear_program,
extra_pages=extra_pages,
accounts=accounts,
foreign_apps=foreign_apps,
foreign_assets=foreign_assets,
boxes=encoded_boxes,
note=note,
lease=lease,
rekey_to=rekey_to,
)
return atc
def _resolve_abi_method(
self, method: abi.Method | ABIReturnSubroutine | str
) -> abi.Method:
if isinstance(method, ABIReturnSubroutine):
return method.method_spec()
elif isinstance(method, str):
try:
return next(
iter(
m
for m in self.app.contract.methods
if m.get_signature() == method
)
)
except StopIteration:
pass
return self.app.contract.get_method_by_name(method)
else:
return method
def add_transaction(
self, atc: AtomicTransactionComposer, txn: transaction.Transaction
) -> AtomicTransactionComposer:
if self.signer is None:
raise Exception("No signer available")
atc.add_transaction(TransactionWithSigner(txn=txn, signer=self.signer))
return atc
[docs] def fund(self, amt: int, addr: str | None = None) -> str:
"""convenience method to pay the address passed, defaults to paying the app address for this client from the current signer"""
sender = self.get_sender()
signer = self.get_signer()
sp = self.client.suggested_params()
rcv = self.app_addr if addr is None else addr
atc = AtomicTransactionComposer()
atc.add_transaction(
TransactionWithSigner(
txn=transaction.PaymentTxn(sender, sp, rcv, amt),
signer=signer,
)
)
atc.execute(self.client, 4)
return atc.tx_ids.pop()
[docs] def get_global_state(
self, *, raw: bool = False
) -> dict[bytes | str, bytes | str | int]:
"""gets the global state info for the app id set"""
global_state = self.client.application_info(self.app_id)
return cast(
dict[bytes | str, bytes | str | int],
decode_state(
global_state.get("params", {}).get("global-state", {}), raw=raw
),
)
[docs] def get_local_state(
self, account: str | None = None, *, raw: bool = False
) -> dict[str | bytes, bytes | str | int]:
"""gets the local state info for the app id set and the account specified"""
if account is None:
account = self.get_sender()
acct_state = self.client.account_application_info(account, self.app_id)
if (
"app-local-state" not in acct_state
or "key-value" not in acct_state["app-local-state"]
):
return {}
return cast(
dict[str | bytes, bytes | str | int],
decode_state(acct_state["app-local-state"]["key-value"], raw=raw),
)
[docs] def get_application_account_info(self) -> dict[str, Any]:
"""gets the account info for the application account"""
return self.client.account_info(self.app_addr)
def get_box_names(self) -> list[bytes]:
box_resp = self.client.application_boxes(self.app_id)
return [b64decode(box["name"]) for box in box_resp["boxes"]]
def get_box_contents(self, name: bytes) -> bytes:
contents = self.client.application_box_by_name(self.app_id, name)
return b64decode(contents["value"])
def resolve(self, to_resolve: DefaultArgumentDict) -> Any: # noqa: ANN401
match to_resolve:
case {"source": "constant", "data": data}:
return data
case {"source": "global-state", "data": str() as key}:
global_state = self.get_global_state(raw=True)
return global_state[key.encode()]
case {"source": "local-state", "data": str() as key}:
acct_state = self.get_local_state(self.get_sender(), raw=True)
return acct_state[key.encode()]
case {"source": "abi-method", "data": dict() as method_dict}:
method = abi.Method.undictify(method_dict)
result = self.call(method)
return result.return_value
case {"source": source}:
raise ValueError(f"Unrecognized default argument source: {source}")
case _:
raise TypeError("Unable to interpret default argument specification")
def _method_hints(self, method: abi.Method) -> MethodHints:
sig = method.get_signature()
if sig not in self.app.hints:
return MethodHints()
return self.app.hints[sig]
def get_suggested_params(
self,
sp: transaction.SuggestedParams | None = None,
) -> transaction.SuggestedParams:
if sp is not None:
return sp
if self.suggested_params is not None:
return self.suggested_params
return self.client.suggested_params()
def _execute_atc(
self, atc: AtomicTransactionComposer, wait_rounds: int = 4
) -> AtomicTransactionResponse:
try:
return atc.execute(self.client, wait_rounds=wait_rounds)
except Exception as ex:
if self.approval.source_map and self.approval.raw_binary:
logic_error_data = parse_logic_error(str(ex))
if logic_error_data is not None:
raise LogicException(
logic_error=ex,
program=self.approval.teal,
map=self.approval.source_map,
**logic_error_data,
) from ex
raise ex
def get_signer(self, signer: TransactionSigner | None = None) -> TransactionSigner:
if signer is not None:
return signer
if self.signer is not None:
return self.signer
raise Exception("No signer provided")
def get_sender(
self, sender: str | None = None, signer: TransactionSigner | None = None
) -> str:
if sender is not None:
return sender
if signer is None and self.sender is not None:
return self.sender
signer = self.get_signer(signer)
if isinstance(signer, AccountTransactionSigner):
return address_from_private_key(signer.private_key)
elif isinstance(signer, MultisigTransactionSigner):
return signer.msig.address()
elif isinstance(signer, LogicSigTransactionSigner):
return signer.lsig.address()
raise Exception("No sender provided")