如何使用 GCP KMS 获取有效且确定性的签名并恢复以太坊交易的地址?

How to get a valid and deterministic signature and recover the address of an Ethereum transaction using GCP KMS?

提问人:WestC 提问时间:11/13/2023 最后编辑:WestC 更新时间:11/22/2023 访问量:55

问:

我想使用 GCP KMS 签署以太坊交易并恢复发件人地址。然而,奇怪的是,当我提供完全相同的交易时,我没有得到相同的签名。我使用的是使用该算法的 HSM 密钥。ec-sign-secp256k1-sha256

以下是我用于签署交易和恢复发件人地址的代码:

from google.cloud import kms
from google.oauth2 import service_account
from crcmod.predefined import mkPredefinedCrcFun  # type: ignore
from dotenv import load_dotenv
from json import loads
from web3 import Web3
from rlp import encode
from eth_utils import (
    add_0x_prefix,
    keccak,
    remove_0x_prefix,
    to_bytes,
    to_int,
    to_checksum_address,
    int_to_big_endian,
    big_endian_to_int,
)
from eth_keys import keys

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from pyasn1.type import univ, namedtype
from pyasn1.codec.der.decoder import decode

load_dotenv()

GOOGLE_APPLICATION_CREDENTIALS = loads(os.getenv("GOOGLE_APPLICATION_CREDENTIALS"))
CREDENTIALS = service_account.Credentials.from_service_account_info(
    GOOGLE_APPLICATION_CREDENTIALS
)

class EcdsaSig(univ.Sequence):
    componentType = namedtype.NamedTypes(
        namedtype.NamedType("r", univ.Integer()),
        namedtype.NamedType("s", univ.Integer()),
    )

    def decode_ecdsa_sig(self, asn_string_buffer):
        ecdsa_sig, _ = decode(asn_string_buffer, asn1Spec=EcdsaSig())

        return ecdsa_sig["r"], ecdsa_sig["s"]


class HSM:
    client: kms.KeyManagementServiceClient
    key_version_name: str
    address: str

    def __init__(
        self,
        project_id: str,
        location_id: str,
        key_ring_id: str,
        key_id: str,
        version_id: str,
    ) -> None:
        self.client = kms.KeyManagementServiceClient(credentials=CREDENTIALS)
        self.key_version_name = self.client.crypto_key_version_path(
            project_id, location_id, key_ring_id, key_id, version_id
        )
        self.address = ""

    def sign(self, transaction: dict) -> None:
        message_bytes = to_bytes(hexstr=self.__rlp_encode_transaction(transaction))
        hash_message = keccak(primitive=message_bytes)
        signature, r, s = self.__request_kms_signature(hash_message)
        self.__derive_ethereum_address()
        _, v = self.__determine_correct_v(hash_message, r, s)

        sig = self.__join_signature(r, s, v)
        print(sig)

        return sig

    def derive_address(self) -> str:
        self.__derive_ethereum_address()

        return self.address

    def __request_kms_signature(self, hashed_message: bytes) -> tuple[bytes, int, int]:
        digest = {"sha256": hashed_message}
        digest_crc32c = self.__crc32c(hashed_message)

        sign_request = kms.AsymmetricSignRequest(
            name=self.key_version_name, digest=digest, digest_crc32c=digest_crc32c
        )

        sign_response = self.client.asymmetric_sign(request=sign_request)
        print(len(sign_response.signature.hex()))
        r, s = self.__find_ethereum_sig(sign_response.signature)

        return sign_response.signature, r, s

    def __crc32c(self, data: bytes) -> int:
        crc32c_fun = mkPredefinedCrcFun("crc-32c")
        return crc32c_fun(data)

    def __pem_to_der(self, pem_key: str) -> bytes:
        # Deserialize PEM-encoded key from the provided string
        private_key = serialization.load_pem_public_key(
            pem_key.encode(),  # Convert PEM string to bytes
            backend=default_backend(),
        )

        # Serialize the key to DER format
        der_key = private_key.public_bytes(
            encoding=serialization.Encoding.DER,
            format=serialization.PublicFormat.SubjectPublicKeyInfo,
        )

        return der_key

    def __derive_ethereum_address(self) -> None:
        if not self.address:
            public_key = self.client.get_public_key(
                request={"name": self.key_version_name}
            )
            der_public_key = self.__pem_to_der(public_key.pem)
            eth_address = keccak(primitive=der_public_key)[-20:]
            self.address = to_checksum_address(eth_address)

    def __find_ethereum_sig(self, signature: bytes) -> tuple[int, int]:
        r, s = EcdsaSig().decode_ecdsa_sig(signature)
        secp256k1_N = int(
            "fffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141", 16
        )

        s = secp256k1_N - s if s > secp256k1_N / 2 else s

        return int(r), int(s)

    def __determine_correct_v(self, message: bytes, r: int, s: int) -> tuple[str, int]:
        v = 0
        recoverd_address = self.__recover_pub_key_from_sig(message, r, s, v)
        print(recoverd_address)
        print(self.address)
        if recoverd_address.lower() != self.address.lower():
            # if the pub key for v = 27 does not match
            # it has to be v = 28
            v = 1
            recoverd_address = self.__recover_pub_key_from_sig(message, r, s, v)

        return recoverd_address, v + 27

    def __recover_pub_key_from_sig(self, message: bytes, r: int, s: int, v: int) -> str:
        signature = keys.Signature(vrs=(v, r, s))
        public_key = signature.recover_public_key_from_msg(message)
        address = public_key.to_checksum_address()

        return address

    def __join_signature(self, r: int, s: int, v: int) -> str:
        # Ensure r, s, and v are integers
        r, s, v = map(to_bytes, (r, s, v))
        signature_hex = int_to_big_endian(big_endian_to_int(r + s + v)).hex()

        return signature_hex

    def __rlp_encode_transaction(self, tx: dict) -> str:
        encoded_params = encode(
            [
                tx["chainId"],
                tx["nonce"],
                tx["maxPriorityFeePerGas"],
                tx["maxFeePerGas"],
                tx["gas"],
                to_bytes(hexstr=tx["to"]),
                tx["value"],
                to_bytes(hexstr=tx["data"]),
                [],
            ]
        )

        return self.__add_transaction_type(encoded_params.hex())   

    def __add_transaction_type(self, payload: str) -> str:
        return f"02{remove_0x_prefix(payload)}"


w3 = Web3(
    Web3.HTTPProvider(
        "RPC_URL"
    )
)

hsm = HSM(
    "innovation-sandbox-1",
    "southamerica-east1",
    "my-key-ring",
    "solidity",
    "1",
)

# Example Ethereum transaction data
nonce = w3.eth.get_transaction_count("ONE_ADDRESS")
gas_price = w3.to_wei("20", "gwei")
gas_limit = 21000
to_address = "ANOTHER_ADDRESS"
value = w3.to_wei("1", "ether")

# Create the Ethereum transaction
transaction = {
    "chainId": w3.eth.chain_id,
    "nonce": nonce,
    "maxPriorityFeePerGas": w3.eth.max_priority_fee,
    "maxFeePerGas": gas_price,
    "gas": gas_limit,
    "to": to_address,
    "value": value,
    "data": "0x",
}


hsm.sign(transaction)

如果有人能帮我解决这个问题,我将不胜感激。感谢您抽出宝贵时间接受采访。

python 密码学 以太坊 数字签名 google-cloud-kms

评论


答: 暂无答案