Initial commit

Initial commit.
This commit is contained in:
kntran1
2026-03-23 14:40:39 -05:00
parent e84b2b4166
commit 4e2a5258a5
872 changed files with 165227 additions and 0 deletions

View File

@@ -0,0 +1,148 @@
#! /usr/bin/env python3
#
# Copyright 2017 Linaro Limited
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Assemble multiple images into a single image that can be flashed on the device.
"""
import argparse
import errno
import io
import re
import os
import os.path
import pickle
import sys
def same_keys(a, b):
"""Determine if the dicts a and b have the same keys in them"""
for ak in a.keys():
if ak not in b:
return False
for bk in b.keys():
if bk not in a:
return False
return True
offset_re = re.compile(r"^#define DT_FLASH_AREA_([0-9A-Z_]+)_OFFSET(_0)?\s+(0x[0-9a-fA-F]+|[0-9]+)$")
size_re = re.compile(r"^#define DT_FLASH_AREA_([0-9A-Z_]+)_SIZE(_0)?\s+(0x[0-9a-fA-F]+|[0-9]+)$")
class Assembly():
def __init__(self, output, bootdir, edt):
self.find_slots(edt)
try:
os.unlink(output)
except OSError as e:
if e.errno != errno.ENOENT:
raise
self.output = output
def find_slots(self, edt):
offsets = {}
sizes = {}
part_nodes = edt.compat2nodes["fixed-partitions"]
for node in part_nodes:
for child in node.children.values():
if "label" in child.props:
label = child.props["label"].val
offsets[label] = child.regs[0].addr
sizes[label] = child.regs[0].size
if not same_keys(offsets, sizes):
raise Exception("Inconsistent data in devicetree.h")
# We care about the mcuboot, image-0, and image-1 partitions.
if 'mcuboot' not in offsets:
raise Exception("Board partition table does not have mcuboot partition")
if 'image-0' not in offsets:
raise Exception("Board partition table does not have image-0 partition")
if 'image-1' not in offsets:
raise Exception("Board partition table does not have image-1 partition")
self.offsets = offsets
self.sizes = sizes
def add_image(self, source, partition):
with open(self.output, 'ab') as ofd:
pos = ofd.tell()
print("partition {}, pos={}, offset={}".format(partition, pos, self.offsets[partition]))
if pos > self.offsets[partition]:
raise Exception("Partitions not in order, unsupported")
if pos < self.offsets[partition]:
buf = b'\xFF' * (self.offsets[partition] - pos)
ofd.write(buf)
with open(source, 'rb') as rfd:
ibuf = rfd.read()
if len(ibuf) > self.sizes[partition]:
raise Exception("Image {} is too large for partition".format(source))
ofd.write(ibuf)
def find_board_name(bootdir):
dot_config = os.path.join(bootdir, "zephyr", ".config")
with open(dot_config, "r") as f:
for line in f:
if line.startswith("CONFIG_BOARD="):
return line.split("=", 1)[1].strip('"')
raise Exception("Expected CONFIG_BOARD line in {}".format(dot_config))
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-b', '--bootdir', required=True,
help='Directory of built bootloader')
parser.add_argument('-p', '--primary', required=True,
help='Signed image file for primary image')
parser.add_argument('-s', '--secondary',
help='Signed image file for secondary image')
parser.add_argument('-o', '--output', required=True,
help='Filename to write full image to')
parser.add_argument('-z', '--zephyr-base',
help='Zephyr base containing the Zephyr repository')
args = parser.parse_args()
zephyr_base = args.zephyr_base
if zephyr_base is None:
try:
zephyr_base = os.environ['ZEPHYR_BASE']
except KeyError:
print('Need to either have ZEPHYR_BASE in environment or pass in -z')
sys.exit(1)
sys.path.insert(0, os.path.join(zephyr_base, "scripts", "dts", "python-devicetree", "src"))
import devicetree.edtlib
board = find_board_name(args.bootdir)
edt_pickle = os.path.join(args.bootdir, "zephyr", "edt.pickle")
with open(edt_pickle, 'rb') as f:
edt = pickle.load(f)
assert isinstance(edt, devicetree.edtlib.EDT)
output = Assembly(args.output, args.bootdir, edt)
output.add_image(os.path.join(args.bootdir, 'zephyr', 'zephyr.bin'), 'mcuboot')
output.add_image(args.primary, "image-0")
if args.secondary is not None:
output.add_image(args.secondary, "image-1")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,20 @@
#! /bin/bash
#
# SPDX-License-Identifier: Apache-2.0
source $(dirname $0)/../target.sh
lscript=/tmp/flash$$.jlink
cat >$lscript <<EOF
h
r
loadfile outdir/$BOARD/zephyr.bin $BASE_BOOT
loadfile hello.signed.bin $BASE_PRIMARY_SLOT
loadfile shell.signed.bin $BASE_SECONDARY_SLOT
q
EOF
JLinkExe -device $SOC -si SWD -speed auto \
-CommanderScript $lscript
rm $lscript

View File

@@ -0,0 +1,29 @@
#! /bin/bash
#
# SPDX-License-Identifier: Apache-2.0
source $(dirname $0)/../target.sh
gscript=/tmp/init$$.gdb
cat > $gscript <<EOF
target remote localhost:2331
symbol-file outdir/$BOARD/zephyr.elf
# symbol-file ../zephyr/samples/shell/outdir/$BOARD/zephyr.elf
# dir apps/boot/src
# dir libs/bootutil/src
# dir hw/mcu/stm/stm32f4xx/src
b main
# b __reset
# b bootutil_img_validate
# b cmp_rsasig
# b bootutil_verify_sig
# b mbedtls_rsa_public
# b boot_calloc
mon reset 2
layout src
focus cmd
EOF
$gdbexe -x $gscript
rm $gscript

View File

@@ -0,0 +1,32 @@
# SPDX-License-Identifier: Apache-2.0
#
# Nix environment for imgtool
#
# To install the environment
#
# $ nix-env --file imgtool.nix --install env-imgtool
#
# To load the environment
#
# $ load-env-imgtool
#
with import <nixpkgs> {};
let
# Nixpkgs has fairly recent versions of the dependencies, so we can
# rely on them without having to build our own derivations.
imgtoolPythonEnv = python37.withPackages (
_: [
python37.pkgs.click
python37.pkgs.cryptography
python37.pkgs.intelhex
python37.pkgs.setuptools
python37.pkgs.cbor2
python37.pkgs.pyyaml
]
);
in
myEnvFun {
name = "imgtool";
buildInputs = [ imgtoolPythonEnv ];
}

View File

@@ -0,0 +1,22 @@
#! /usr/bin/env python3
#
# Copyright 2017 Linaro Limited
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from imgtool import main
if __name__ == '__main__':
main.imgtool()

View File

@@ -0,0 +1,17 @@
# Copyright 2017-2020 Linaro Limited
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
imgtool_version = "2.1.0"

View File

@@ -0,0 +1,52 @@
# Copyright (c) 2019-2024, Arm Limited.
# Copyright (c) 2020, Linaro Limited
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
try:
from cbor2 import dumps
except ImportError:
from cbor import dumps
class SwComponent(int, Enum):
"""
Software component property IDs specified by
Arm's PSA Attestation API 1.0 document.
"""
TYPE = 1
MEASUREMENT_VALUE = 2
VERSION = 4
SIGNER_ID = 5
MEASUREMENT_DESCRIPTION = 6
def create_sw_component_data(sw_type, sw_version, sw_measurement_description,
sw_measurement_value, sw_signer_id):
# List of software component properties (Key ID + value)
properties = {SwComponent.TYPE: sw_type,
SwComponent.VERSION: sw_version,
SwComponent.SIGNER_ID: sw_signer_id,
SwComponent.MEASUREMENT_DESCRIPTION: sw_measurement_description,
SwComponent.MEASUREMENT_VALUE: sw_measurement_value,
}
# Note: The measurement value must be the last item of the property
# list because later it will be modified by the bootloader.
last_key = list(properties.keys())[-1]
assert SwComponent.MEASUREMENT_VALUE == last_key, 'Measurement value is not the last item of the property list'
return dumps(properties)

View File

@@ -0,0 +1,328 @@
# Copyright 2023-2024 Arm Limited
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Parse and print header, TLV area and trailer information of a signed image.
"""
import os.path
import struct
import sys
import click
import yaml
from imgtool import image
HEADER_ITEMS = ("magic", "load_addr", "hdr_size", "protected_tlv_size",
"img_size", "flags", "version")
TLV_TYPES = dict((value, key) for key, value in image.TLV_VALUES.items())
BOOT_MAGIC = bytes([
0x77, 0xc2, 0x95, 0xf3,
0x60, 0xd2, 0xef, 0x7f,
0x35, 0x52, 0x50, 0x0f,
0x2c, 0xb6, 0x79, 0x80, ])
BOOT_MAGIC_2 = bytes([
0x2d, 0xe1, 0x5d, 0x29,
0x41, 0x0b, 0x8d, 0x77,
0x67, 0x9c, 0x11, 0x0f,
0x1f, 0x8a, ])
BOOT_MAGIC_SIZE = len(BOOT_MAGIC)
_LINE_LENGTH = 60
STATUS = {
'0x1': 'SET',
'0x2': 'BAD',
'0x3': 'UNSET',
'0x4': 'ANY',
}
def parse_enc(key_field_len):
if key_field_len is not None:
return "(len: {}, if BOOT_SWAP_SAVE_ENCTLV is unset)".format(hex(key_field_len))
else:
return "Image not encrypted"
def parse_size(size_hex):
if size_hex == '0xffffffff':
return "unknown"
return size_hex + " octal: " + str(int(size_hex, 0))
def parse_status(status_hex):
return f"{STATUS[status_hex]} ({status_hex})" if status_hex in STATUS else f"INVALID ({status_hex})"
def parse_boot_magic(trailer_magic):
magic = ""
for i in range(BOOT_MAGIC_SIZE):
magic += "{0:#04x} ".format(trailer_magic[i])
if i == (BOOT_MAGIC_SIZE / 2 - 1):
magic += ("\n" + " ")
return magic
def print_in_frame(header_text, content):
sepc = " "
header = "#### " + header_text + sepc
post_header = "#" * (_LINE_LENGTH - len(header))
print(header + post_header)
print("|", sepc * (_LINE_LENGTH - 2), "|", sep="")
offset = (_LINE_LENGTH - len(content)) // 2
pre = "|" + (sepc * (offset - 1))
post = sepc * (_LINE_LENGTH - len(pre) - len(content) - 1) + "|"
print(pre, content, post, sep="")
print("|", sepc * (_LINE_LENGTH - 2), "|", sep="")
print("#" * _LINE_LENGTH)
def print_in_row(row_text):
row_text = "#### " + row_text + " "
fill = "#" * (_LINE_LENGTH - len(row_text))
print(row_text + fill)
def print_tlv_records(tlv_list):
indent = _LINE_LENGTH // 8
for tlv in tlv_list:
print(" " * indent, "-" * 45)
tlv_type, tlv_length, tlv_data = tlv.keys()
if tlv[tlv_type] in TLV_TYPES:
print(" " * indent, "{}: {} ({})".format(
tlv_type, TLV_TYPES[tlv[tlv_type]], hex(tlv[tlv_type])))
else:
print(" " * indent, "{}: {} ({})".format(
tlv_type, "UNKNOWN", hex(tlv[tlv_type])))
print(" " * indent, "{}: ".format(tlv_length), hex(tlv[tlv_length]))
print(" " * indent, "{}: ".format(tlv_data), end="")
for j, data in enumerate(tlv[tlv_data]):
print("{0:#04x}".format(data), end=" ")
if ((j + 1) % 8 == 0) and ((j + 1) != len(tlv[tlv_data])):
print("\n", end=" " * (indent + 7))
print()
def dump_imginfo(imgfile, outfile=None, silent=False):
"""Parse a signed image binary and print/save the available information."""
trailer_magic = None
# set to INVALID by default
swap_size = 0x99
swap_info = 0x99
copy_done = 0x99
image_ok = 0x99
trailer = {}
key_field_len = None
try:
with open(imgfile, "rb") as f:
b = f.read()
except FileNotFoundError:
raise click.UsageError("Image file not found ({})".format(imgfile))
# Parsing the image header
_header = struct.unpack('IIHHIIBBHI', b[:28])
# Image version consists of the last 4 item ('BBHI')
_version = _header[-4:]
header = {}
for i, key in enumerate(HEADER_ITEMS):
if key == "version":
header[key] = "{}.{}.{}+{}".format(*_version)
else:
header[key] = _header[i]
# Parsing the TLV area
tlv_area = {"tlv_hdr_prot": {},
"tlvs_prot": [],
"tlv_hdr": {},
"tlvs": []}
tlv_off = header["hdr_size"] + header["img_size"]
protected_tlv_size = header["protected_tlv_size"]
if protected_tlv_size != 0:
_tlv_prot_head = struct.unpack(
'HH',
b[tlv_off:(tlv_off + image.TLV_INFO_SIZE)])
tlv_area["tlv_hdr_prot"]["magic"] = _tlv_prot_head[0]
tlv_area["tlv_hdr_prot"]["tlv_tot"] = _tlv_prot_head[1]
tlv_end = tlv_off + tlv_area["tlv_hdr_prot"]["tlv_tot"]
tlv_off += image.TLV_INFO_SIZE
# Iterating through the protected TLV area
while tlv_off < tlv_end:
tlv_type, tlv_len = struct.unpack(
'HH',
b[tlv_off:(tlv_off + image.TLV_INFO_SIZE)])
tlv_off += image.TLV_INFO_SIZE
tlv_data = b[tlv_off:(tlv_off + tlv_len)]
tlv_area["tlvs_prot"].append(
{"type": tlv_type, "len": tlv_len, "data": tlv_data})
tlv_off += tlv_len
_tlv_head = struct.unpack('HH', b[tlv_off:(tlv_off + image.TLV_INFO_SIZE)])
tlv_area["tlv_hdr"]["magic"] = _tlv_head[0]
tlv_area["tlv_hdr"]["tlv_tot"] = _tlv_head[1]
tlv_end = tlv_off + tlv_area["tlv_hdr"]["tlv_tot"]
tlv_off += image.TLV_INFO_SIZE
# Iterating through the TLV area
while tlv_off < tlv_end:
tlv_type, tlv_len = struct.unpack(
'HH',
b[tlv_off:(tlv_off + image.TLV_INFO_SIZE)])
tlv_off += image.TLV_INFO_SIZE
tlv_data = b[tlv_off:(tlv_off + tlv_len)]
tlv_area["tlvs"].append(
{"type": tlv_type, "len": tlv_len, "data": tlv_data})
tlv_off += tlv_len
_img_pad_size = len(b) - tlv_end
if _img_pad_size:
# Parsing the image trailer
trailer_off = -BOOT_MAGIC_SIZE
trailer_magic = b[trailer_off:]
trailer["magic"] = trailer_magic
max_align = None
if trailer_magic == BOOT_MAGIC:
# The maximum supported write alignment is the default 8 Bytes
max_align = 8
elif trailer_magic[-len(BOOT_MAGIC_2):] == BOOT_MAGIC_2:
# The alignment value is encoded in the magic field
max_align = int.from_bytes(trailer_magic[:2], "little")
else:
# Invalid magic: the rest of the image trailer cannot be processed.
print("Warning: the trailer magic value is invalid!")
if max_align is not None:
if max_align > BOOT_MAGIC_SIZE:
trailer_off -= max_align - BOOT_MAGIC_SIZE
# Parsing rest of the trailer fields
trailer_off -= max_align
image_ok = b[trailer_off]
trailer["image_ok"] = image_ok
trailer_off -= max_align
copy_done = b[trailer_off]
trailer["copy_done"] = copy_done
trailer_off -= max_align
swap_info = b[trailer_off]
trailer["swap_info"] = swap_info
trailer_off -= max_align
swap_size = int.from_bytes(b[trailer_off:(trailer_off + 4)],
"little")
trailer["swap_size"] = swap_size
# Encryption key 0/1
if ((header["flags"] & image.IMAGE_F["ENCRYPTED_AES128"]) or
(header["flags"] & image.IMAGE_F["ENCRYPTED_AES256"])):
# The image is encrypted
# Estimated value of key_field_len is correct if
# BOOT_SWAP_SAVE_ENCTLV is unset
key_field_len = image.align_up(16, max_align) * 2
# Generating output yaml file
if outfile is not None:
imgdata = {"header": header,
"tlv_area": tlv_area,
"trailer": trailer}
with open(outfile, "w") as outf:
# sort_keys - from pyyaml 5.1
yaml.dump(imgdata, outf, sort_keys=False)
###############################################################################
if silent:
sys.exit(0)
print("Printing content of signed image:", os.path.basename(imgfile), "\n")
# Image header
section_name = "Image header (offset: 0x0)"
print_in_row(section_name)
for key, value in header.items():
if key == "flags":
if not value:
flag_string = hex(value)
else:
flag_string = ""
for flag in image.IMAGE_F.keys():
if value & image.IMAGE_F[flag]:
if flag_string:
flag_string += ("\n" + (" " * 20))
flag_string += "{} ({})".format(
flag, hex(image.IMAGE_F[flag]))
value = flag_string
if not isinstance(value, str):
value = hex(value)
print(key, ":", " " * (19 - len(key)), value, sep="")
print("#" * _LINE_LENGTH)
# Image payload
_sectionoff = header["hdr_size"]
frame_header_text = "Payload (offset: {})".format(hex(_sectionoff))
frame_content = "FW image (size: {} Bytes)".format(hex(header["img_size"]))
print_in_frame(frame_header_text, frame_content)
# TLV area
_sectionoff += header["img_size"]
if protected_tlv_size != 0:
# Protected TLV area
section_name = "Protected TLV area (offset: {})".format(hex(_sectionoff))
print_in_row(section_name)
print("magic: ", hex(tlv_area["tlv_hdr_prot"]["magic"]))
print("area size:", hex(tlv_area["tlv_hdr_prot"]["tlv_tot"]))
print_tlv_records(tlv_area["tlvs_prot"])
print("#" * _LINE_LENGTH)
_sectionoff += protected_tlv_size
section_name = "TLV area (offset: {})".format(hex(_sectionoff))
print_in_row(section_name)
print("magic: ", hex(tlv_area["tlv_hdr"]["magic"]))
print("area size:", hex(tlv_area["tlv_hdr"]["tlv_tot"]))
print_tlv_records(tlv_area["tlvs"])
print("#" * _LINE_LENGTH)
if _img_pad_size:
_sectionoff += tlv_area["tlv_hdr"]["tlv_tot"]
_erased_val = b[_sectionoff]
frame_header_text = "Image padding (offset: {})".format(hex(_sectionoff))
frame_content = "padding ({})".format(hex(_erased_val))
print_in_frame(frame_header_text, frame_content)
# Image trailer
section_name = "Image trailer (offset: unknown)"
print_in_row(section_name)
notice = "(Note: some fields may not be used, depending on the update strategy)\n"
notice = '\n'.join(notice[i:i + _LINE_LENGTH] for i in range(0, len(notice), _LINE_LENGTH))
print(notice)
print("swap status: (len: unknown)")
print("enc. keys: ", parse_enc(key_field_len))
print("swap size: ", parse_size(hex(swap_size)))
print("swap_info: ", parse_status(hex(swap_info)))
print("copy_done: ", parse_status(hex(copy_done)))
print("image_ok: ", parse_status(hex(image_ok)))
print("boot magic: ", parse_boot_magic(trailer_magic))
print()
footer = "End of Image "
print_in_row(footer)

View File

@@ -0,0 +1,880 @@
# Copyright 2018 Nordic Semiconductor ASA
# Copyright 2017-2020 Linaro Limited
# Copyright 2019-2024 Arm Limited
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Image signing and management.
"""
from . import version as versmod
from .boot_record import create_sw_component_data
import click
import copy
from enum import Enum
import array
from intelhex import IntelHex
import hashlib
import array
import os.path
import struct
from enum import Enum
import click
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, hmac
from cryptography.hazmat.primitives.asymmetric import ec, padding
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from intelhex import IntelHex
from . import version as versmod, keys
from .boot_record import create_sw_component_data
from .keys import rsa, ecdsa, x25519
from collections import namedtuple
IMAGE_MAGIC = 0x96f3b83d
IMAGE_HEADER_SIZE = 32
BIN_EXT = "bin"
INTEL_HEX_EXT = "hex"
DEFAULT_MAX_SECTORS = 128
DEFAULT_MAX_ALIGN = 8
DEP_IMAGES_KEY = "images"
DEP_VERSIONS_KEY = "versions"
MAX_SW_TYPE_LENGTH = 12 # Bytes
# Image header flags.
IMAGE_F = {
'PIC': 0x0000001,
'ENCRYPTED_AES128': 0x0000004,
'ENCRYPTED_AES256': 0x0000008,
'NON_BOOTABLE': 0x0000010,
'RAM_LOAD': 0x0000020,
'ROM_FIXED': 0x0000100,
'COMPRESSED_LZMA1': 0x0000200,
'COMPRESSED_LZMA2': 0x0000400,
'COMPRESSED_ARM_THUMB': 0x0000800,
}
TLV_VALUES = {
'KEYHASH': 0x01,
'PUBKEY': 0x02,
'SHA256': 0x10,
'SHA384': 0x11,
'SHA512': 0x12,
'RSA2048': 0x20,
'ECDSASIG': 0x22,
'RSA3072': 0x23,
'ED25519': 0x24,
'SIG_PURE': 0x25,
'ENCRSA2048': 0x30,
'ENCKW': 0x31,
'ENCEC256': 0x32,
'ENCX25519': 0x33,
'DEPENDENCY': 0x40,
'SEC_CNT': 0x50,
'BOOT_RECORD': 0x60,
'DECOMP_SIZE': 0x70,
'DECOMP_SHA': 0x71,
'DECOMP_SIGNATURE': 0x72,
}
TLV_SIZE = 4
TLV_INFO_SIZE = 4
TLV_INFO_MAGIC = 0x6907
TLV_PROT_INFO_MAGIC = 0x6908
TLV_VENDOR_RES_MIN = 0x00a0
TLV_VENDOR_RES_MAX = 0xfffe
STRUCT_ENDIAN_DICT = {
'little': '<',
'big': '>'
}
VerifyResult = Enum('VerifyResult',
['OK', 'INVALID_MAGIC', 'INVALID_TLV_INFO_MAGIC', 'INVALID_HASH', 'INVALID_SIGNATURE',
'KEY_MISMATCH'])
def align_up(num, align):
assert (align & (align - 1) == 0) and align != 0
return (num + (align - 1)) & ~(align - 1)
class TLV():
def __init__(self, endian, magic=TLV_INFO_MAGIC):
self.magic = magic
self.buf = bytearray()
self.endian = endian
def __len__(self):
return TLV_INFO_SIZE + len(self.buf)
def add(self, kind, payload):
"""
Add a TLV record. Kind should be a string found in TLV_VALUES above.
"""
e = STRUCT_ENDIAN_DICT[self.endian]
if isinstance(kind, int):
if not TLV_VENDOR_RES_MIN <= kind <= TLV_VENDOR_RES_MAX:
msg = "Invalid custom TLV type value '0x{:04x}', allowed " \
"value should be between 0x{:04x} and 0x{:04x}".format(
kind, TLV_VENDOR_RES_MIN, TLV_VENDOR_RES_MAX)
raise click.UsageError(msg)
buf = struct.pack(e + 'HH', kind, len(payload))
else:
buf = struct.pack(e + 'BBH', TLV_VALUES[kind], 0, len(payload))
self.buf += buf
self.buf += payload
def get(self):
if len(self.buf) == 0:
return bytes()
e = STRUCT_ENDIAN_DICT[self.endian]
header = struct.pack(e + 'HH', self.magic, len(self))
return header + bytes(self.buf)
SHAAndAlgT = namedtuple('SHAAndAlgT', ['sha', 'alg'])
TLV_SHA_TO_SHA_AND_ALG = {
TLV_VALUES['SHA256'] : SHAAndAlgT('256', hashlib.sha256),
TLV_VALUES['SHA384'] : SHAAndAlgT('384', hashlib.sha384),
TLV_VALUES['SHA512'] : SHAAndAlgT('512', hashlib.sha512),
}
USER_SHA_TO_ALG_AND_TLV = {
'auto' : (hashlib.sha256, 'SHA256'),
'256' : (hashlib.sha256, 'SHA256'),
'384' : (hashlib.sha384, 'SHA384'),
'512' : (hashlib.sha512, 'SHA512')
}
def is_sha_tlv(tlv):
return tlv in TLV_SHA_TO_SHA_AND_ALG.keys()
def tlv_sha_to_sha(tlv):
return TLV_SHA_TO_SHA_AND_ALG[tlv].sha
# Auto selecting hash algorithm for type(key)
ALLOWED_KEY_SHA = {
keys.ECDSA384P1 : ['384'],
keys.ECDSA384P1Public : ['384'],
keys.ECDSA256P1 : ['256'],
keys.RSA : ['256'],
# This two are set to 256 for compatibility, the right would be 512
keys.Ed25519 : ['256', '512'],
keys.X25519 : ['256', '512']
}
ALLOWED_PURE_KEY_SHA = {
keys.Ed25519 : ['512']
}
ALLOWED_PURE_SIG_TLVS = [
TLV_VALUES['ED25519']
]
def key_and_user_sha_to_alg_and_tlv(key, user_sha, is_pure = False):
"""Matches key and user requested sha to sha alogrithm and TLV name.
The returned tuple will contain hash functions and TVL name.
The function is designed to succeed or completely fail execution,
as providing incorrect pair here basically prevents doing
any more work.
"""
if key is None:
# If key is none, we allow whatever user has selected for sha
return USER_SHA_TO_ALG_AND_TLV[user_sha]
# If key is not None, then we have to filter hash to only allowed
allowed = None
allowed_key_ssh = ALLOWED_PURE_KEY_SHA if is_pure else ALLOWED_KEY_SHA
try:
allowed = allowed_key_ssh[type(key)]
except KeyError:
raise click.UsageError("Colud not find allowed hash algorithms for {}"
.format(type(key)))
# Pure enforces auto, and user selection is ignored
if user_sha == 'auto' or is_pure:
return USER_SHA_TO_ALG_AND_TLV[allowed[0]]
if user_sha in allowed:
return USER_SHA_TO_ALG_AND_TLV[user_sha]
raise click.UsageError("Key {} can not be used with --sha {}; allowed sha are one of {}"
.format(key.sig_type(), user_sha, allowed))
def get_digest(tlv_type, hash_region):
sha = TLV_SHA_TO_SHA_AND_ALG[tlv_type].alg()
sha.update(hash_region)
return sha.digest()
def tlv_matches_key_type(tlv_type, key):
"""Check if provided key matches to TLV record in the image"""
try:
# We do not need the result here, and the key_and_user_sha_to_alg_and_tlv
# will either succeed finding match or rise exception, so on success we
# return True, on exception we return False.
_, _ = key_and_user_sha_to_alg_and_tlv(key, tlv_sha_to_sha(tlv_type))
return True
except:
pass
return False
class Image:
def __init__(self, version=None, header_size=IMAGE_HEADER_SIZE,
pad_header=False, pad=False, confirm=False, align=1,
slot_size=0, max_sectors=DEFAULT_MAX_SECTORS,
overwrite_only=False, endian="little", load_addr=0,
rom_fixed=None, erased_val=None, save_enctlv=False,
security_counter=None, max_align=None,
non_bootable=False):
if load_addr and rom_fixed:
raise click.UsageError("Can not set rom_fixed and load_addr at the same time")
self.image_hash = None
self.image_size = None
self.signature = None
self.version = version or versmod.decode_version("0")
self.header_size = header_size
self.pad_header = pad_header
self.pad = pad
self.confirm = confirm
self.align = align
self.slot_size = slot_size
self.max_sectors = max_sectors
self.overwrite_only = overwrite_only
self.endian = endian
self.base_addr = None
self.load_addr = 0 if load_addr is None else load_addr
self.rom_fixed = rom_fixed
self.erased_val = 0xff if erased_val is None else int(erased_val, 0)
self.payload = []
self.infile_data = []
self.enckey = None
self.save_enctlv = save_enctlv
self.enctlv_len = 0
self.max_align = max(DEFAULT_MAX_ALIGN, align) if max_align is None else int(max_align)
self.non_bootable = non_bootable
if self.max_align == DEFAULT_MAX_ALIGN:
self.boot_magic = bytes([
0x77, 0xc2, 0x95, 0xf3,
0x60, 0xd2, 0xef, 0x7f,
0x35, 0x52, 0x50, 0x0f,
0x2c, 0xb6, 0x79, 0x80, ])
else:
lsb = self.max_align & 0x00ff
msb = (self.max_align & 0xff00) >> 8
align = bytes([msb, lsb]) if self.endian == "big" else bytes([lsb, msb])
self.boot_magic = align + bytes([0x2d, 0xe1,
0x5d, 0x29, 0x41, 0x0b,
0x8d, 0x77, 0x67, 0x9c,
0x11, 0x0f, 0x1f, 0x8a, ])
if security_counter == 'auto':
# Security counter has not been explicitly provided,
# generate it from the version number
self.security_counter = ((self.version.major << 24)
+ (self.version.minor << 16)
+ self.version.revision)
else:
self.security_counter = security_counter
def __repr__(self):
return "<Image version={}, header_size={}, security_counter={}, \
base_addr={}, load_addr={}, align={}, slot_size={}, \
max_sectors={}, overwrite_only={}, endian={} format={}, \
payloadlen=0x{:x}>".format(
self.version,
self.header_size,
self.security_counter,
self.base_addr if self.base_addr is not None else "N/A",
self.load_addr,
self.align,
self.slot_size,
self.max_sectors,
self.overwrite_only,
self.endian,
self.__class__.__name__,
len(self.payload))
def load(self, path):
"""Load an image from a given file"""
ext = os.path.splitext(path)[1][1:].lower()
try:
if ext == INTEL_HEX_EXT:
ih = IntelHex(path)
self.infile_data = ih.tobinarray()
self.payload = copy.copy(self.infile_data)
self.base_addr = ih.minaddr()
else:
with open(path, 'rb') as f:
self.infile_data = f.read()
self.payload = copy.copy(self.infile_data)
except FileNotFoundError:
raise click.UsageError("Input file not found")
self.image_size = len(self.payload)
# Add the image header if needed.
if self.pad_header and self.header_size > 0:
if self.base_addr:
# Adjust base_addr for new header
self.base_addr -= self.header_size
self.payload = bytes([self.erased_val] * self.header_size) + \
self.payload
self.check_header()
def load_compressed(self, data, compression_header):
"""Load an image from buffer"""
self.payload = compression_header + data
self.image_size = len(self.payload)
# Add the image header if needed.
if self.pad_header and self.header_size > 0:
if self.base_addr:
# Adjust base_addr for new header
self.base_addr -= self.header_size
self.payload = bytes([self.erased_val] * self.header_size) + \
self.payload
self.check_header()
def save(self, path, hex_addr=None):
"""Save an image from a given file"""
ext = os.path.splitext(path)[1][1:].lower()
if ext == INTEL_HEX_EXT:
# input was in binary format, but HEX needs to know the base addr
if self.base_addr is None and hex_addr is None:
raise click.UsageError("No address exists in input file "
"neither was it provided by user")
h = IntelHex()
if hex_addr is not None:
self.base_addr = hex_addr
h.frombytes(bytes=self.payload, offset=self.base_addr)
if self.pad:
trailer_size = self._trailer_size(self.align, self.max_sectors,
self.overwrite_only,
self.enckey,
self.save_enctlv,
self.enctlv_len)
trailer_addr = (self.base_addr + self.slot_size) - trailer_size
if self.confirm and not self.overwrite_only:
magic_align_size = align_up(len(self.boot_magic),
self.max_align)
image_ok_idx = -(magic_align_size + self.max_align)
flag = bytearray([self.erased_val] * self.max_align)
flag[0] = 0x01 # image_ok = 0x01
h.puts(trailer_addr + trailer_size + image_ok_idx,
bytes(flag))
h.puts(trailer_addr + (trailer_size - len(self.boot_magic)),
bytes(self.boot_magic))
h.tofile(path, 'hex')
else:
if self.pad:
self.pad_to(self.slot_size)
with open(path, 'wb') as f:
f.write(self.payload)
def check_header(self):
if self.header_size > 0 and not self.pad_header:
if any(v != 0 for v in self.payload[0:self.header_size]):
raise click.UsageError("Header padding was not requested and "
"image does not start with zeros")
def check_trailer(self):
if self.slot_size > 0:
tsize = self._trailer_size(self.align, self.max_sectors,
self.overwrite_only, self.enckey,
self.save_enctlv, self.enctlv_len)
padding = self.slot_size - (len(self.payload) + tsize)
if padding < 0:
msg = "Image size (0x{:x}) + trailer (0x{:x}) exceeds " \
"requested size 0x{:x}".format(
len(self.payload), tsize, self.slot_size)
raise click.UsageError(msg)
def ecies_hkdf(self, enckey, plainkey):
if isinstance(enckey, ecdsa.ECDSA256P1Public):
newpk = ec.generate_private_key(ec.SECP256R1(), default_backend())
shared = newpk.exchange(ec.ECDH(), enckey._get_public())
else:
newpk = X25519PrivateKey.generate()
shared = newpk.exchange(enckey._get_public())
derived_key = HKDF(
algorithm=hashes.SHA256(), length=48, salt=None,
info=b'MCUBoot_ECIES_v1', backend=default_backend()).derive(shared)
encryptor = Cipher(algorithms.AES(derived_key[:16]),
modes.CTR(bytes([0] * 16)),
backend=default_backend()).encryptor()
cipherkey = encryptor.update(plainkey) + encryptor.finalize()
mac = hmac.HMAC(derived_key[16:], hashes.SHA256(),
backend=default_backend())
mac.update(cipherkey)
ciphermac = mac.finalize()
if isinstance(enckey, ecdsa.ECDSA256P1Public):
pubk = newpk.public_key().public_bytes(
encoding=Encoding.X962,
format=PublicFormat.UncompressedPoint)
else:
pubk = newpk.public_key().public_bytes(
encoding=Encoding.Raw,
format=PublicFormat.Raw)
return cipherkey, ciphermac, pubk
def create(self, key, public_key_format, enckey, dependencies=None,
sw_type=None, custom_tlvs=None, compression_tlvs=None,
compression_type=None, encrypt_keylen=128, clear=False,
fixed_sig=None, pub_key=None, vector_to_sign=None,
user_sha='auto', is_pure=False):
self.enckey = enckey
# key decides on sha, then pub_key; of both are none default is used
check_key = key if key is not None else pub_key
hash_algorithm, hash_tlv = key_and_user_sha_to_alg_and_tlv(check_key, user_sha, is_pure)
# Calculate the hash of the public key
if key is not None:
pub = key.get_public_bytes()
sha = hash_algorithm()
sha.update(pub)
pubbytes = sha.digest()
elif pub_key is not None:
if hasattr(pub_key, 'sign'):
print(os.path.basename(__file__) + ": sign the payload")
pub = pub_key.get_public_bytes()
sha = hash_algorithm()
sha.update(pub)
pubbytes = sha.digest()
else:
pubbytes = bytes(hashlib.sha256().digest_size)
protected_tlv_size = 0
if self.security_counter is not None:
# Size of the security counter TLV: header ('HH') + payload ('I')
# = 4 + 4 = 8 Bytes
protected_tlv_size += TLV_SIZE + 4
if sw_type is not None:
if len(sw_type) > MAX_SW_TYPE_LENGTH:
msg = "'{}' is too long ({} characters) for sw_type. Its " \
"maximum allowed length is 12 characters.".format(
sw_type, len(sw_type))
raise click.UsageError(msg)
image_version = (str(self.version.major) + '.'
+ str(self.version.minor) + '.'
+ str(self.version.revision))
# The image hash is computed over the image header, the image
# itself and the protected TLV area. However, the boot record TLV
# (which is part of the protected area) should contain this hash
# before it is even calculated. For this reason the script fills
# this field with zeros and the bootloader will insert the right
# value later.
digest = bytes(hash_algorithm().digest_size)
# Create CBOR encoded boot record
boot_record = create_sw_component_data(sw_type, image_version,
hash_tlv, digest,
pubbytes)
protected_tlv_size += TLV_SIZE + len(boot_record)
if dependencies is not None:
# Size of a Dependency TLV = Header ('HH') + Payload('IBBHI')
# = 4 + 12 = 16 Bytes
dependencies_num = len(dependencies[DEP_IMAGES_KEY])
protected_tlv_size += (dependencies_num * 16)
if compression_tlvs is not None:
for value in compression_tlvs.values():
protected_tlv_size += TLV_SIZE + len(value)
if custom_tlvs is not None:
for value in custom_tlvs.values():
protected_tlv_size += TLV_SIZE + len(value)
if protected_tlv_size != 0:
# Add the size of the TLV info header
protected_tlv_size += TLV_INFO_SIZE
# At this point the image is already on the payload
#
# This adds the padding if image is not aligned to the 16 Bytes
# in encrypted mode
if self.enckey is not None:
pad_len = len(self.payload) % 16
if pad_len > 0:
pad = bytes(16 - pad_len)
if isinstance(self.payload, bytes):
self.payload += pad
else:
self.payload.extend(pad)
compression_flags = 0x0
if compression_tlvs is not None:
if compression_type in ["lzma2", "lzma2armthumb"]:
compression_flags = IMAGE_F['COMPRESSED_LZMA2']
if compression_type == "lzma2armthumb":
compression_flags |= IMAGE_F['COMPRESSED_ARM_THUMB']
# This adds the header to the payload as well
if encrypt_keylen == 256:
self.add_header(enckey, protected_tlv_size, compression_flags, 256)
else:
self.add_header(enckey, protected_tlv_size, compression_flags)
prot_tlv = TLV(self.endian, TLV_PROT_INFO_MAGIC)
# Protected TLVs must be added first, because they are also included
# in the hash calculation
protected_tlv_off = None
if protected_tlv_size != 0:
e = STRUCT_ENDIAN_DICT[self.endian]
if self.security_counter is not None:
payload = struct.pack(e + 'I', self.security_counter)
prot_tlv.add('SEC_CNT', payload)
if sw_type is not None:
prot_tlv.add('BOOT_RECORD', boot_record)
if dependencies is not None:
for i in range(dependencies_num):
payload = struct.pack(
e + 'B3x' + 'BBHI',
int(dependencies[DEP_IMAGES_KEY][i]),
dependencies[DEP_VERSIONS_KEY][i].major,
dependencies[DEP_VERSIONS_KEY][i].minor,
dependencies[DEP_VERSIONS_KEY][i].revision,
dependencies[DEP_VERSIONS_KEY][i].build
)
prot_tlv.add('DEPENDENCY', payload)
if compression_tlvs is not None:
for tag, value in compression_tlvs.items():
prot_tlv.add(tag, value)
if custom_tlvs is not None:
for tag, value in custom_tlvs.items():
prot_tlv.add(tag, value)
protected_tlv_off = len(self.payload)
self.payload += prot_tlv.get()
tlv = TLV(self.endian)
# These signature is done over sha of image. In case of
# EC signatures so called Pure algorithm, designated to be run
# over entire message is used with sha of image as message,
# so, for example, in case of ED25519 we have here SHAxxx-ED25519-SHA512.
sha = hash_algorithm()
sha.update(self.payload)
digest = sha.digest()
tlv.add(hash_tlv, digest)
# for external usage
self.image_hash = digest
# Unless pure, we are signing digest.
message = digest
if is_pure:
# Note that when Pure signature is used, hash TLV is not present.
message = bytes(self.payload)
e = STRUCT_ENDIAN_DICT[self.endian]
sig_pure = struct.pack(e + '?', True)
tlv.add('SIG_PURE', sig_pure)
if vector_to_sign == 'payload':
# Stop amending data to the image
# Just keep data vector which is expected to be signed
print(os.path.basename(__file__) + ': export payload')
return
elif vector_to_sign == 'digest':
self.payload = digest
print(os.path.basename(__file__) + ': export digest')
return
if key is not None or fixed_sig is not None:
if public_key_format == 'hash':
tlv.add('KEYHASH', pubbytes)
else:
tlv.add('PUBKEY', pub)
if key is not None and fixed_sig is None:
# `sign` expects the full image payload (hashing done
# internally), while `sign_digest` expects only the digest
# of the payload
if hasattr(key, 'sign'):
print(os.path.basename(__file__) + ": sign the payload")
sig = key.sign(bytes(self.payload))
else:
print(os.path.basename(__file__) + ": sign the digest")
sig = key.sign_digest(message)
tlv.add(key.sig_tlv(), sig)
self.signature = sig
elif fixed_sig is not None and key is None:
tlv.add(pub_key.sig_tlv(), fixed_sig['value'])
self.signature = fixed_sig['value']
else:
raise click.UsageError("Can not sign using key and provide fixed-signature at the same time")
# At this point the image was hashed + signed, we can remove the
# protected TLVs from the payload (will be re-added later)
if protected_tlv_off is not None:
self.payload = self.payload[:protected_tlv_off]
if enckey is not None:
if encrypt_keylen == 256:
plainkey = os.urandom(32)
else:
plainkey = os.urandom(16)
if isinstance(enckey, rsa.RSAPublic):
cipherkey = enckey._get_public().encrypt(
plainkey, padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None))
self.enctlv_len = len(cipherkey)
tlv.add('ENCRSA2048', cipherkey)
elif isinstance(enckey, (ecdsa.ECDSA256P1Public,
x25519.X25519Public)):
cipherkey, mac, pubk = self.ecies_hkdf(enckey, plainkey)
enctlv = pubk + mac + cipherkey
self.enctlv_len = len(enctlv)
if isinstance(enckey, ecdsa.ECDSA256P1Public):
tlv.add('ENCEC256', enctlv)
else:
tlv.add('ENCX25519', enctlv)
if not clear:
nonce = bytes([0] * 16)
cipher = Cipher(algorithms.AES(plainkey), modes.CTR(nonce),
backend=default_backend())
encryptor = cipher.encryptor()
img = bytes(self.payload[self.header_size:])
self.payload[self.header_size:] = \
encryptor.update(img) + encryptor.finalize()
self.payload += prot_tlv.get()
self.payload += tlv.get()
self.check_trailer()
def get_struct_endian(self):
return STRUCT_ENDIAN_DICT[self.endian]
def get_signature(self):
return self.signature
def get_infile_data(self):
return self.infile_data
def add_header(self, enckey, protected_tlv_size, compression_flags, aes_length=128):
"""Install the image header."""
flags = 0
if enckey is not None:
if aes_length == 128:
flags |= IMAGE_F['ENCRYPTED_AES128']
else:
flags |= IMAGE_F['ENCRYPTED_AES256']
if self.load_addr != 0:
# Indicates that this image should be loaded into RAM
# instead of run directly from flash.
flags |= IMAGE_F['RAM_LOAD']
if self.rom_fixed:
flags |= IMAGE_F['ROM_FIXED']
if self.non_bootable:
flags |= IMAGE_F['NON_BOOTABLE']
e = STRUCT_ENDIAN_DICT[self.endian]
fmt = (e +
# type ImageHdr struct {
'I' + # Magic uint32
'I' + # LoadAddr uint32
'H' + # HdrSz uint16
'H' + # PTLVSz uint16
'I' + # ImgSz uint32
'I' + # Flags uint32
'BBHI' + # Vers ImageVersion
'I' # Pad1 uint32
) # }
assert struct.calcsize(fmt) == IMAGE_HEADER_SIZE
header = struct.pack(fmt,
IMAGE_MAGIC,
self.rom_fixed or self.load_addr,
self.header_size,
protected_tlv_size, # TLV Info header +
# Protected TLVs
len(self.payload) - self.header_size, # ImageSz
flags | compression_flags,
self.version.major,
self.version.minor or 0,
self.version.revision or 0,
self.version.build or 0,
0) # Pad1
self.payload = bytearray(self.payload)
self.payload[:len(header)] = header
def _trailer_size(self, write_size, max_sectors, overwrite_only, enckey,
save_enctlv, enctlv_len):
# NOTE: should already be checked by the argument parser
magic_size = 16
magic_align_size = align_up(magic_size, self.max_align)
if overwrite_only:
return self.max_align * 2 + magic_align_size
else:
if write_size not in set([1, 2, 4, 8, 16, 32]):
raise click.BadParameter("Invalid alignment: {}".format(
write_size))
m = DEFAULT_MAX_SECTORS if max_sectors is None else max_sectors
trailer = m * 3 * write_size # status area
if enckey is not None:
if save_enctlv:
# TLV saved by the bootloader is aligned
keylen = align_up(enctlv_len, self.max_align)
else:
keylen = align_up(16, self.max_align)
trailer += keylen * 2 # encryption keys
trailer += self.max_align * 4 # image_ok/copy_done/swap_info/swap_size
trailer += magic_align_size
return trailer
def pad_to(self, size):
"""Pad the image to the given size, with the given flash alignment."""
tsize = self._trailer_size(self.align, self.max_sectors,
self.overwrite_only, self.enckey,
self.save_enctlv, self.enctlv_len)
padding = size - (len(self.payload) + tsize)
pbytes = bytearray([self.erased_val] * padding)
pbytes += bytearray([self.erased_val] * (tsize - len(self.boot_magic)))
pbytes += self.boot_magic
if self.confirm and not self.overwrite_only:
magic_size = 16
magic_align_size = align_up(magic_size, self.max_align)
image_ok_idx = -(magic_align_size + self.max_align)
pbytes[image_ok_idx] = 0x01 # image_ok = 0x01
self.payload += pbytes
@staticmethod
def verify(imgfile, key):
ext = os.path.splitext(imgfile)[1][1:].lower()
try:
if ext == INTEL_HEX_EXT:
b = IntelHex(imgfile).tobinstr()
else:
with open(imgfile, 'rb') as f:
b = f.read()
except FileNotFoundError:
raise click.UsageError(f"Image file {imgfile} not found")
magic, _, header_size, _, img_size = struct.unpack('IIHHI', b[:16])
version = struct.unpack('BBHI', b[20:28])
if magic != IMAGE_MAGIC:
return VerifyResult.INVALID_MAGIC, None, None, None
tlv_off = header_size + img_size
tlv_info = b[tlv_off:tlv_off + TLV_INFO_SIZE]
magic, tlv_tot = struct.unpack('HH', tlv_info)
if magic == TLV_PROT_INFO_MAGIC:
tlv_off += tlv_tot
tlv_info = b[tlv_off:tlv_off + TLV_INFO_SIZE]
magic, tlv_tot = struct.unpack('HH', tlv_info)
if magic != TLV_INFO_MAGIC:
return VerifyResult.INVALID_TLV_INFO_MAGIC, None, None, None
# This is set by existence of TLV SIG_PURE
is_pure = False
prot_tlv_size = tlv_off
hash_region = b[:prot_tlv_size]
tlv_end = tlv_off + tlv_tot
tlv_off += TLV_INFO_SIZE # skip tlv info
# First scan all TLVs in search of SIG_PURE
while tlv_off < tlv_end:
tlv = b[tlv_off:tlv_off + TLV_SIZE]
tlv_type, _, tlv_len = struct.unpack('BBH', tlv)
if tlv_type == TLV_VALUES['SIG_PURE']:
is_pure = True
break
tlv_off += TLV_SIZE + tlv_len
digest = None
tlv_off = header_size + img_size
tlv_end = tlv_off + tlv_tot
tlv_off += TLV_INFO_SIZE # skip tlv info
while tlv_off < tlv_end:
tlv = b[tlv_off:tlv_off + TLV_SIZE]
tlv_type, _, tlv_len = struct.unpack('BBH', tlv)
if is_sha_tlv(tlv_type):
if not tlv_matches_key_type(tlv_type, key):
return VerifyResult.KEY_MISMATCH, None, None, None
off = tlv_off + TLV_SIZE
digest = get_digest(tlv_type, hash_region)
if digest == b[off:off + tlv_len]:
if key is None:
return VerifyResult.OK, version, digest, None
else:
return VerifyResult.INVALID_HASH, None, None, None
elif not is_pure and key is not None and tlv_type == TLV_VALUES[key.sig_tlv()]:
off = tlv_off + TLV_SIZE
tlv_sig = b[off:off + tlv_len]
payload = b[:prot_tlv_size]
try:
if hasattr(key, 'verify'):
key.verify(tlv_sig, payload)
else:
key.verify_digest(tlv_sig, digest)
return VerifyResult.OK, version, digest, None
except InvalidSignature:
# continue to next TLV
pass
elif is_pure and key is not None and tlv_type in ALLOWED_PURE_SIG_TLVS:
off = tlv_off + TLV_SIZE
tlv_sig = b[off:off + tlv_len]
try:
key.verify_digest(tlv_sig, hash_region)
return VerifyResult.OK, version, None, tlv_sig
except InvalidSignature:
# continue to next TLV
pass
tlv_off += TLV_SIZE + tlv_len
return VerifyResult.INVALID_SIGNATURE, None, None, None

View File

@@ -0,0 +1,105 @@
# Copyright 2017 Linaro Limited
# Copyright 2023 Arm Limited
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Cryptographic key management for imgtool.
"""
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.rsa import (
RSAPrivateKey, RSAPublicKey)
from cryptography.hazmat.primitives.asymmetric.ec import (
EllipticCurvePrivateKey, EllipticCurvePublicKey)
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey, Ed25519PublicKey)
from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey, X25519PublicKey)
from .rsa import RSA, RSAPublic, RSAUsageError, RSA_KEY_SIZES
from .ecdsa import (ECDSA256P1, ECDSA256P1Public,
ECDSA384P1, ECDSA384P1Public, ECDSAUsageError)
from .ed25519 import Ed25519, Ed25519Public, Ed25519UsageError
from .x25519 import X25519, X25519Public, X25519UsageError
class PasswordRequired(Exception):
"""Raised to indicate that the key is password protected, but a
password was not specified."""
pass
def load(path, passwd=None):
"""Try loading a key from the given path.
Returns None if the password wasn't specified."""
with open(path, 'rb') as f:
raw_pem = f.read()
try:
pk = serialization.load_pem_private_key(
raw_pem,
password=passwd,
backend=default_backend())
# Unfortunately, the crypto library raises unhelpful exceptions,
# so we have to look at the text.
except TypeError as e:
msg = str(e)
if "private key is encrypted" in msg:
return None
raise e
except ValueError:
# This seems to happen if the key is a public key, let's try
# loading it as a public key.
pk = serialization.load_pem_public_key(
raw_pem,
backend=default_backend())
if isinstance(pk, RSAPrivateKey):
if pk.key_size not in RSA_KEY_SIZES:
raise Exception("Unsupported RSA key size: " + pk.key_size)
return RSA(pk)
elif isinstance(pk, RSAPublicKey):
if pk.key_size not in RSA_KEY_SIZES:
raise Exception("Unsupported RSA key size: " + pk.key_size)
return RSAPublic(pk)
elif isinstance(pk, EllipticCurvePrivateKey):
if pk.curve.name not in ('secp256r1', 'secp384r1'):
raise Exception("Unsupported EC curve: " + pk.curve.name)
if pk.key_size not in (256, 384):
raise Exception("Unsupported EC size: " + pk.key_size)
if pk.curve.name == 'secp256r1':
return ECDSA256P1(pk)
elif pk.curve.name == 'secp384r1':
return ECDSA384P1(pk)
elif isinstance(pk, EllipticCurvePublicKey):
if pk.curve.name not in ('secp256r1', 'secp384r1'):
raise Exception("Unsupported EC curve: " + pk.curve.name)
if pk.key_size not in (256, 384):
raise Exception("Unsupported EC size: " + pk.key_size)
if pk.curve.name == 'secp256r1':
return ECDSA256P1Public(pk)
elif pk.curve.name == 'secp384r1':
return ECDSA384P1Public(pk)
elif isinstance(pk, Ed25519PrivateKey):
return Ed25519(pk)
elif isinstance(pk, Ed25519PublicKey):
return Ed25519Public(pk)
elif isinstance(pk, X25519PrivateKey):
return X25519(pk)
elif isinstance(pk, X25519PublicKey):
return X25519Public(pk)
else:
raise Exception("Unknown key type: " + str(type(pk)))

View File

@@ -0,0 +1,289 @@
"""
ECDSA key management
"""
# SPDX-License-Identifier: Apache-2.0
import os.path
import hashlib
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.hashes import SHA256, SHA384
from .general import KeyClass
from .privatebytes import PrivateBytesMixin
class ECDSAUsageError(Exception):
pass
class ECDSAPublicKey(KeyClass):
"""
Wrapper around an ECDSA public key.
"""
def __init__(self, key):
self.key = key
def _unsupported(self, name):
raise ECDSAUsageError("Operation {} requires private key".format(name))
def _get_public(self):
return self.key
def get_public_bytes(self):
# The key is embedded into MBUboot in "SubjectPublicKeyInfo" format
return self._get_public().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
def get_public_pem(self):
return self._get_public().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
def get_private_bytes(self, minimal, format):
self._unsupported('get_private_bytes')
def export_private(self, path, passwd=None):
self._unsupported('export_private')
def export_public(self, path):
"""Write the public key to the given file."""
pem = self._get_public().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
with open(path, 'wb') as f:
f.write(pem)
class ECDSAPrivateKey(PrivateBytesMixin):
"""
Wrapper around an ECDSA private key.
"""
def __init__(self, key):
self.key = key
def _get_public(self):
return self.key.public_key()
def _build_minimal_ecdsa_privkey(self, der, format):
'''
Builds a new DER that only includes the EC private key, removing the
public key that is added as an "optional" BITSTRING.
'''
if format == serialization.PrivateFormat.OpenSSH:
print(os.path.basename(__file__) +
': Warning: --minimal is supported only for PKCS8 '
'or TraditionalOpenSSL formats')
return bytearray(der)
EXCEPTION_TEXT = "Error parsing ecdsa key. Please submit an issue!"
if format == serialization.PrivateFormat.PKCS8:
offset_PUB = 68 # where the context specific TLV starts (tag 0xA1)
if der[offset_PUB] != 0xa1:
raise ECDSAUsageError(EXCEPTION_TEXT)
len_PUB = der[offset_PUB + 1] + 2 # + 2 for 0xA1 0x44 bytes
b = bytearray(der[:offset_PUB]) # remove the TLV with the PUB key
offset_SEQ = 29
if b[offset_SEQ] != 0x30:
raise ECDSAUsageError(EXCEPTION_TEXT)
b[offset_SEQ + 1] -= len_PUB
offset_OCT_STR = 27
if b[offset_OCT_STR] != 0x04:
raise ECDSAUsageError(EXCEPTION_TEXT)
b[offset_OCT_STR + 1] -= len_PUB
if b[0] != 0x30 or b[1] != 0x81:
raise ECDSAUsageError(EXCEPTION_TEXT)
# as b[1] has bit7 set, the length is on b[2]
b[2] -= len_PUB
if b[2] < 0x80:
del(b[1])
elif format == serialization.PrivateFormat.TraditionalOpenSSL:
offset_PUB = 51
if der[offset_PUB] != 0xA1:
raise ECDSAUsageError(EXCEPTION_TEXT)
len_PUB = der[offset_PUB + 1] + 2
b = bytearray(der[0:offset_PUB])
b[1] -= len_PUB
return b
_VALID_FORMATS = {
'pkcs8': serialization.PrivateFormat.PKCS8,
'openssl': serialization.PrivateFormat.TraditionalOpenSSL
}
_DEFAULT_FORMAT = 'pkcs8'
def get_private_bytes(self, minimal, format):
format, priv = self._get_private_bytes(minimal,
format, ECDSAUsageError)
if minimal:
priv = self._build_minimal_ecdsa_privkey(
priv, self._VALID_FORMATS[format])
return priv
def export_private(self, path, passwd=None):
"""Write the private key to the given file, protecting it with '
'the optional password."""
if passwd is None:
enc = serialization.NoEncryption()
else:
enc = serialization.BestAvailableEncryption(passwd)
pem = self.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=enc)
with open(path, 'wb') as f:
f.write(pem)
class ECDSA256P1Public(ECDSAPublicKey):
"""
Wrapper around an ECDSA (p256) public key.
"""
def __init__(self, key):
super().__init__(key)
self.key = key
def shortname(self):
return "ecdsa"
def sig_type(self):
return "ECDSA256_SHA256"
def sig_tlv(self):
return "ECDSASIG"
def sig_len(self):
# Early versions of MCUboot (< v1.5.0) required ECDSA
# signatures to be padded to 72 bytes. Because the DER
# encoding is done with signed integers, the size of the
# signature will vary depending on whether the high bit is set
# in each value. This padding was done in a
# not-easily-reversible way (by just adding zeros).
#
# The signing code no longer requires this padding, and newer
# versions of MCUboot don't require it. But, continue to
# return the total length so that the padding can be done if
# requested.
return 72
def verify(self, signature, payload):
# strip possible paddings added during sign
signature = signature[:signature[1] + 2]
k = self.key
if isinstance(self.key, ec.EllipticCurvePrivateKey):
k = self.key.public_key()
return k.verify(signature=signature, data=payload,
signature_algorithm=ec.ECDSA(SHA256()))
class ECDSA256P1(ECDSAPrivateKey, ECDSA256P1Public):
"""
Wrapper around an ECDSA (p256) private key.
"""
def __init__(self, key):
super().__init__(key)
self.key = key
self.pad_sig = False
@staticmethod
def generate():
pk = ec.generate_private_key(
ec.SECP256R1(),
backend=default_backend())
return ECDSA256P1(pk)
def raw_sign(self, payload):
"""Return the actual signature"""
return self.key.sign(
data=payload,
signature_algorithm=ec.ECDSA(SHA256()))
def sign(self, payload):
sig = self.raw_sign(payload)
if self.pad_sig:
# To make fixed length, pad with one or two zeros.
sig += b'\000' * (self.sig_len() - len(sig))
return sig
else:
return sig
class ECDSA384P1Public(ECDSAPublicKey):
"""
Wrapper around an ECDSA (p384) public key.
"""
def __init__(self, key):
super().__init__(key)
self.key = key
def shortname(self):
return "ecdsap384"
def sig_type(self):
return "ECDSA384_SHA384"
def sig_tlv(self):
return "ECDSASIG"
def sig_len(self):
# Early versions of MCUboot (< v1.5.0) required ECDSA
# signatures to be padded to a fixed length. Because the DER
# encoding is done with signed integers, the size of the
# signature will vary depending on whether the high bit is set
# in each value. This padding was done in a
# not-easily-reversible way (by just adding zeros).
#
# The signing code no longer requires this padding, and newer
# versions of MCUboot don't require it. But, continue to
# return the total length so that the padding can be done if
# requested.
return 103
def verify(self, signature, payload):
# strip possible paddings added during sign
signature = signature[:signature[1] + 2]
k = self.key
if isinstance(self.key, ec.EllipticCurvePrivateKey):
k = self.key.public_key()
return k.verify(signature=signature, data=payload,
signature_algorithm=ec.ECDSA(SHA384()))
class ECDSA384P1(ECDSAPrivateKey, ECDSA384P1Public):
"""
Wrapper around an ECDSA (p384) private key.
"""
def __init__(self, key):
"""key should be an instance of EllipticCurvePrivateKey"""
super().__init__(key)
self.key = key
self.pad_sig = False
@staticmethod
def generate():
pk = ec.generate_private_key(
ec.SECP384R1(),
backend=default_backend())
return ECDSA384P1(pk)
def raw_sign(self, payload):
"""Return the actual signature"""
return self.key.sign(
data=payload,
signature_algorithm=ec.ECDSA(SHA384()))
def sign(self, payload):
sig = self.raw_sign(payload)
if self.pad_sig:
# To make fixed length, pad with one or two zeros.
sig += b'\000' * (self.sig_len() - len(sig))
return sig
else:
return sig

View File

@@ -0,0 +1,120 @@
"""
Tests for ECDSA keys
"""
# SPDX-License-Identifier: Apache-2.0
import io
import os.path
import sys
import tempfile
import unittest
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.hashes import SHA256
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
from imgtool.keys import load, ECDSA256P1, ECDSAUsageError
class EcKeyGeneration(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.TemporaryDirectory()
def tname(self, base):
return os.path.join(self.test_dir.name, base)
def tearDown(self):
self.test_dir.cleanup()
def test_keygen(self):
name1 = self.tname("keygen.pem")
k = ECDSA256P1.generate()
k.export_private(name1, b'secret')
self.assertIsNone(load(name1))
k2 = load(name1, b'secret')
pubname = self.tname('keygen-pub.pem')
k2.export_public(pubname)
pk2 = load(pubname)
# We should be able to export the public key from the loaded
# public key, but not the private key.
pk2.export_public(self.tname('keygen-pub2.pem'))
self.assertRaises(ECDSAUsageError,
pk2.export_private, self.tname('keygen-priv2.pem'))
def test_emit(self):
"""Basic sanity check on the code emitters."""
k = ECDSA256P1.generate()
pubpem = io.StringIO()
k.emit_public_pem(pubpem)
self.assertIn("BEGIN PUBLIC KEY", pubpem.getvalue())
self.assertIn("END PUBLIC KEY", pubpem.getvalue())
ccode = io.StringIO()
k.emit_c_public(ccode)
self.assertIn("ecdsa_pub_key", ccode.getvalue())
self.assertIn("ecdsa_pub_key_len", ccode.getvalue())
hashccode = io.StringIO()
k.emit_c_public_hash(hashccode)
self.assertIn("ecdsa_pub_key_hash", hashccode.getvalue())
self.assertIn("ecdsa_pub_key_hash_len", hashccode.getvalue())
rustcode = io.StringIO()
k.emit_rust_public(rustcode)
self.assertIn("ECDSA_PUB_KEY", rustcode.getvalue())
# raw data - bytes
pubraw = io.BytesIO()
k.emit_raw_public(pubraw)
self.assertTrue(len(pubraw.getvalue()) > 0)
hashraw = io.BytesIO()
k.emit_raw_public_hash(hashraw)
self.assertTrue(len(hashraw.getvalue()) > 0)
def test_emit_pub(self):
"""Basic sanity check on the code emitters, from public key."""
pubname = self.tname("public.pem")
k = ECDSA256P1.generate()
k.export_public(pubname)
k2 = load(pubname)
ccode = io.StringIO()
k2.emit_c_public(ccode)
self.assertIn("ecdsa_pub_key", ccode.getvalue())
self.assertIn("ecdsa_pub_key_len", ccode.getvalue())
rustcode = io.StringIO()
k2.emit_rust_public(rustcode)
self.assertIn("ECDSA_PUB_KEY", rustcode.getvalue())
def test_sig(self):
k = ECDSA256P1.generate()
buf = b'This is the message'
sig = k.raw_sign(buf)
# The code doesn't have any verification, so verify this
# manually.
k.key.public_key().verify(
signature=sig,
data=buf,
signature_algorithm=ec.ECDSA(SHA256()))
# Modify the message to make sure the signature fails.
self.assertRaises(InvalidSignature,
k.key.public_key().verify,
signature=sig,
data=b'This is thE message',
signature_algorithm=ec.ECDSA(SHA256()))
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,111 @@
"""
ED25519 key management
"""
# SPDX-License-Identifier: Apache-2.0
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
from .general import KeyClass
class Ed25519UsageError(Exception):
pass
class Ed25519Public(KeyClass):
def __init__(self, key):
self.key = key
def shortname(self):
return "ed25519"
def _unsupported(self, name):
raise Ed25519UsageError("Operation {} requires private key".format(name))
def _get_public(self):
return self.key
def get_public_bytes(self):
# The key is embedded into MBUboot in "SubjectPublicKeyInfo" format
return self._get_public().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
def get_public_pem(self):
return self._get_public().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
def get_private_bytes(self, minimal, format):
self._unsupported('get_private_bytes')
def export_private(self, path, passwd=None):
self._unsupported('export_private')
def export_public(self, path):
"""Write the public key to the given file."""
pem = self._get_public().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
with open(path, 'wb') as f:
f.write(pem)
def sig_type(self):
return "ED25519"
def sig_tlv(self):
return "ED25519"
def sig_len(self):
return 64
def verify_digest(self, signature, digest):
"""Verify that signature is valid for given digest"""
k = self.key
if isinstance(self.key, ed25519.Ed25519PrivateKey):
k = self.key.public_key()
return k.verify(signature=signature, data=digest)
class Ed25519(Ed25519Public):
"""
Wrapper around an ED25519 private key.
"""
def __init__(self, key):
"""key should be an instance of EllipticCurvePrivateKey"""
self.key = key
@staticmethod
def generate():
pk = ed25519.Ed25519PrivateKey.generate()
return Ed25519(pk)
def _get_public(self):
return self.key.public_key()
def get_private_bytes(self, minimal, format):
raise Ed25519UsageError("Operation not supported with {} keys".format(
self.shortname()))
def export_private(self, path, passwd=None):
"""
Write the private key to the given file, protecting it with the
optional password.
"""
if passwd is None:
enc = serialization.NoEncryption()
else:
enc = serialization.BestAvailableEncryption(passwd)
pem = self.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=enc)
with open(path, 'wb') as f:
f.write(pem)
def sign_digest(self, digest):
"""Return the actual signature"""
return self.key.sign(data=digest)

View File

@@ -0,0 +1,124 @@
"""
Tests for ECDSA keys
"""
# SPDX-License-Identifier: Apache-2.0
import hashlib
import io
import os.path
import sys
import tempfile
import unittest
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric import ed25519
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
from imgtool.keys import load, Ed25519, Ed25519UsageError
class Ed25519KeyGeneration(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.TemporaryDirectory()
def tname(self, base):
return os.path.join(self.test_dir.name, base)
def tearDown(self):
self.test_dir.cleanup()
def test_keygen(self):
name1 = self.tname("keygen.pem")
k = Ed25519.generate()
k.export_private(name1, b'secret')
self.assertIsNone(load(name1))
k2 = load(name1, b'secret')
pubname = self.tname('keygen-pub.pem')
k2.export_public(pubname)
pk2 = load(pubname)
# We should be able to export the public key from the loaded
# public key, but not the private key.
pk2.export_public(self.tname('keygen-pub2.pem'))
self.assertRaises(Ed25519UsageError,
pk2.export_private, self.tname('keygen-priv2.pem'))
def test_emit(self):
"""Basic sanity check on the code emitters."""
k = Ed25519.generate()
pubpem = io.StringIO()
k.emit_public_pem(pubpem)
self.assertIn("BEGIN PUBLIC KEY", pubpem.getvalue())
self.assertIn("END PUBLIC KEY", pubpem.getvalue())
ccode = io.StringIO()
k.emit_c_public(ccode)
self.assertIn("ed25519_pub_key", ccode.getvalue())
self.assertIn("ed25519_pub_key_len", ccode.getvalue())
hashccode = io.StringIO()
k.emit_c_public_hash(hashccode)
self.assertIn("ed25519_pub_key_hash", hashccode.getvalue())
self.assertIn("ed25519_pub_key_hash_len", hashccode.getvalue())
rustcode = io.StringIO()
k.emit_rust_public(rustcode)
self.assertIn("ED25519_PUB_KEY", rustcode.getvalue())
# raw data - bytes
pubraw = io.BytesIO()
k.emit_raw_public(pubraw)
self.assertTrue(len(pubraw.getvalue()) > 0)
hashraw = io.BytesIO()
k.emit_raw_public_hash(hashraw)
self.assertTrue(len(hashraw.getvalue()) > 0)
def test_emit_pub(self):
"""Basic sanity check on the code emitters, from public key."""
pubname = self.tname("public.pem")
k = Ed25519.generate()
k.export_public(pubname)
k2 = load(pubname)
ccode = io.StringIO()
k2.emit_c_public(ccode)
self.assertIn("ed25519_pub_key", ccode.getvalue())
self.assertIn("ed25519_pub_key_len", ccode.getvalue())
rustcode = io.StringIO()
k2.emit_rust_public(rustcode)
self.assertIn("ED25519_PUB_KEY", rustcode.getvalue())
def test_sig(self):
k = Ed25519.generate()
buf = b'This is the message'
sha = hashlib.sha256()
sha.update(buf)
digest = sha.digest()
sig = k.sign_digest(digest)
# The code doesn't have any verification, so verify this
# manually.
k.key.public_key().verify(signature=sig, data=digest)
# Modify the message to make sure the signature fails.
sha = hashlib.sha256()
sha.update(b'This is thE message')
new_digest = sha.digest()
self.assertRaises(InvalidSignature,
k.key.public_key().verify,
signature=sig,
data=new_digest)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,115 @@
"""General key class."""
# SPDX-License-Identifier: Apache-2.0
import binascii
import io
import os
import sys
from cryptography.hazmat.primitives.hashes import Hash, SHA256
AUTOGEN_MESSAGE = "/* Autogenerated by imgtool.py, do not edit. */"
class FileHandler(object):
def __init__(self, file, *args, **kwargs):
self.file_in = file
self.args = args
self.kwargs = kwargs
def __enter__(self):
if isinstance(self.file_in, (str, bytes, os.PathLike)):
self.file = open(self.file_in, *self.args, **self.kwargs)
else:
self.file = self.file_in
return self.file
def __exit__(self, *args):
if self.file != self.file_in:
self.file.close()
class KeyClass(object):
def _emit(self, header, trailer, encoded_bytes, indent, file=sys.stdout,
len_format=None):
with FileHandler(file, 'w') as file:
self._emit_to_output(header, trailer, encoded_bytes, indent,
file, len_format)
def _emit_to_output(self, header, trailer, encoded_bytes, indent, file,
len_format):
print(AUTOGEN_MESSAGE, file=file)
print(header, end='', file=file)
for count, b in enumerate(encoded_bytes):
if count % 8 == 0:
print("\n" + indent, end='', file=file)
else:
print(" ", end='', file=file)
print("0x{:02x},".format(b), end='', file=file)
print("\n" + trailer, file=file)
if len_format is not None:
print(len_format.format(len(encoded_bytes)), file=file)
def _emit_raw(self, encoded_bytes, file):
with FileHandler(file, 'wb') as file:
try:
# file.buffer is not part of the TextIOBase API
# and may not exist in some implementations.
file.buffer.write(encoded_bytes)
except AttributeError:
# raw binary data, can be for example io.BytesIO
file.write(encoded_bytes)
def emit_c_public(self, file=sys.stdout):
self._emit(
header="const unsigned char {}_pub_key[] = {{"
.format(self.shortname()),
trailer="};",
encoded_bytes=self.get_public_bytes(),
indent=" ",
len_format="const unsigned int {}_pub_key_len = {{}};"
.format(self.shortname()),
file=file)
def emit_c_public_hash(self, file=sys.stdout):
digest = Hash(SHA256())
digest.update(self.get_public_bytes())
self._emit(
header="const unsigned char {}_pub_key_hash[] = {{"
.format(self.shortname()),
trailer="};",
encoded_bytes=digest.finalize(),
indent=" ",
len_format="const unsigned int {}_pub_key_hash_len = {{}};"
.format(self.shortname()),
file=file)
def emit_raw_public(self, file=sys.stdout):
self._emit_raw(self.get_public_bytes(), file=file)
def emit_raw_public_hash(self, file=sys.stdout):
digest = Hash(SHA256())
digest.update(self.get_public_bytes())
self._emit_raw(digest.finalize(), file=file)
def emit_rust_public(self, file=sys.stdout):
self._emit(
header="static {}_PUB_KEY: &[u8] = &["
.format(self.shortname().upper()),
trailer="];",
encoded_bytes=self.get_public_bytes(),
indent=" ",
file=file)
def emit_public_pem(self, file=sys.stdout):
with FileHandler(file, 'w') as file:
print(str(self.get_public_pem(), 'utf-8'), file=file, end='')
def emit_private(self, minimal, format, file=sys.stdout):
self._emit(
header="const unsigned char enc_priv_key[] = {",
trailer="};",
encoded_bytes=self.get_private_bytes(minimal, format),
indent=" ",
len_format="const unsigned int enc_priv_key_len = {};",
file=file)

View File

@@ -0,0 +1,16 @@
# SPDX-License-Identifier: Apache-2.0
from cryptography.hazmat.primitives import serialization
class PrivateBytesMixin():
def _get_private_bytes(self, minimal, format, exclass):
if format is None:
format = self._DEFAULT_FORMAT
if format not in self._VALID_FORMATS:
raise exclass("{} does not support {}".format(
self.shortname(), format))
return format, self.key.private_bytes(
encoding=serialization.Encoding.DER,
format=self._VALID_FORMATS[format],
encryption_algorithm=serialization.NoEncryption())

View File

@@ -0,0 +1,173 @@
"""
RSA Key management
"""
# SPDX-License-Identifier: Apache-2.0
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric.padding import PSS, MGF1
from cryptography.hazmat.primitives.hashes import SHA256
from .general import KeyClass
from .privatebytes import PrivateBytesMixin
# Sizes that bootutil will recognize
RSA_KEY_SIZES = [2048, 3072]
class RSAUsageError(Exception):
pass
class RSAPublic(KeyClass):
"""The public key can only do a few operations"""
def __init__(self, key):
self.key = key
def key_size(self):
return self.key.key_size
def shortname(self):
return "rsa"
def _unsupported(self, name):
raise RSAUsageError("Operation {} requires private key".format(name))
def _get_public(self):
return self.key
def get_public_bytes(self):
# The key embedded into MCUboot is in PKCS1 format.
return self._get_public().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.PKCS1)
def get_public_pem(self):
return self._get_public().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
def get_private_bytes(self, minimal, format):
self._unsupported('get_private_bytes')
def export_private(self, path, passwd=None):
self._unsupported('export_private')
def export_public(self, path):
"""Write the public key to the given file."""
pem = self._get_public().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
with open(path, 'wb') as f:
f.write(pem)
def sig_type(self):
return "PKCS1_PSS_RSA{}_SHA256".format(self.key_size())
def sig_tlv(self):
return"RSA{}".format(self.key_size())
def sig_len(self):
return self.key_size() / 8
def verify(self, signature, payload):
k = self.key
if isinstance(self.key, rsa.RSAPrivateKey):
k = self.key.public_key()
return k.verify(signature=signature, data=payload,
padding=PSS(mgf=MGF1(SHA256()), salt_length=32),
algorithm=SHA256())
class RSA(RSAPublic, PrivateBytesMixin):
"""
Wrapper around an RSA key, with imgtool support.
"""
def __init__(self, key):
"""The key should be a private key from cryptography"""
self.key = key
@staticmethod
def generate(key_size=2048):
if key_size not in RSA_KEY_SIZES:
raise RSAUsageError("Key size {} is not supported by MCUboot"
.format(key_size))
pk = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=default_backend())
return RSA(pk)
def _get_public(self):
return self.key.public_key()
def _build_minimal_rsa_privkey(self, der):
'''
Builds a new DER that only includes N/E/D/P/Q RSA parameters;
standard DER private bytes provided by OpenSSL also includes
CRT params (DP/DQ/QP) which can be removed.
'''
OFFSET_N = 7 # N is always located at this offset
b = bytearray(der)
off = OFFSET_N
if b[off + 1] != 0x82:
raise RSAUsageError("Error parsing N while minimizing")
len_N = (b[off + 2] << 8) + b[off + 3] + 4
off += len_N
if b[off + 1] != 0x03:
raise RSAUsageError("Error parsing E while minimizing")
len_E = b[off + 2] + 4
off += len_E
if b[off + 1] != 0x82:
raise RSAUsageError("Error parsing D while minimizing")
len_D = (b[off + 2] << 8) + b[off + 3] + 4
off += len_D
if b[off + 1] != 0x81:
raise RSAUsageError("Error parsing P while minimizing")
len_P = b[off + 2] + 3
off += len_P
if b[off + 1] != 0x81:
raise RSAUsageError("Error parsing Q while minimizing")
len_Q = b[off + 2] + 3
off += len_Q
# adjust DER size for removed elements
b[2] = (off - 4) >> 8
b[3] = (off - 4) & 0xff
return b[:off]
_VALID_FORMATS = {
'openssl': serialization.PrivateFormat.TraditionalOpenSSL
}
_DEFAULT_FORMAT = 'openssl'
def get_private_bytes(self, minimal, format):
_, priv = self._get_private_bytes(minimal, format, RSAUsageError)
if minimal:
priv = self._build_minimal_rsa_privkey(priv)
return priv
def export_private(self, path, passwd=None):
"""Write the private key to the given file, protecting it with the
optional password."""
if passwd is None:
enc = serialization.NoEncryption()
else:
enc = serialization.BestAvailableEncryption(passwd)
pem = self.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=enc)
with open(path, 'wb') as f:
f.write(pem)
def sign(self, payload):
# The verification code only allows the salt length to be the
# same as the hash length, 32.
return self.key.sign(
data=payload,
padding=PSS(mgf=MGF1(SHA256()), salt_length=32),
algorithm=SHA256())

View File

@@ -0,0 +1,136 @@
"""
Tests for RSA keys
"""
# SPDX-License-Identifier: Apache-2.0
import io
import os
import sys
import tempfile
import unittest
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.padding import PSS, MGF1
from cryptography.hazmat.primitives.hashes import SHA256
# Setup sys path so 'imgtool' is in it.
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__),
'../..')))
from imgtool.keys import load, RSA, RSAUsageError
from imgtool.keys.rsa import RSA_KEY_SIZES
class KeyGeneration(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.TemporaryDirectory()
def tname(self, base):
return os.path.join(self.test_dir.name, base)
def tearDown(self):
self.test_dir.cleanup()
def test_keygen(self):
# Try generating a RSA key with non-supported size
with self.assertRaises(RSAUsageError):
RSA.generate(key_size=1024)
for key_size in RSA_KEY_SIZES:
name1 = self.tname("keygen.pem")
k = RSA.generate(key_size=key_size)
k.export_private(name1, b'secret')
# Try loading the key without a password.
self.assertIsNone(load(name1))
k2 = load(name1, b'secret')
pubname = self.tname('keygen-pub.pem')
k2.export_public(pubname)
pk2 = load(pubname)
# We should be able to export the public key from the loaded
# public key, but not the private key.
pk2.export_public(self.tname('keygen-pub2.pem'))
self.assertRaises(RSAUsageError, pk2.export_private,
self.tname('keygen-priv2.pem'))
def test_emit(self):
"""Basic sanity check on the code emitters."""
for key_size in RSA_KEY_SIZES:
k = RSA.generate(key_size=key_size)
pubpem = io.StringIO()
k.emit_public_pem(pubpem)
self.assertIn("BEGIN PUBLIC KEY", pubpem.getvalue())
self.assertIn("END PUBLIC KEY", pubpem.getvalue())
ccode = io.StringIO()
k.emit_c_public(ccode)
self.assertIn("rsa_pub_key", ccode.getvalue())
self.assertIn("rsa_pub_key_len", ccode.getvalue())
hashccode = io.StringIO()
k.emit_c_public_hash(hashccode)
self.assertIn("rsa_pub_key_hash", hashccode.getvalue())
self.assertIn("rsa_pub_key_hash_len", hashccode.getvalue())
rustcode = io.StringIO()
k.emit_rust_public(rustcode)
self.assertIn("RSA_PUB_KEY", rustcode.getvalue())
# raw data - bytes
pubraw = io.BytesIO()
k.emit_raw_public(pubraw)
self.assertTrue(len(pubraw.getvalue()) > 0)
hashraw = io.BytesIO()
k.emit_raw_public_hash(hashraw)
self.assertTrue(len(hashraw.getvalue()) > 0)
def test_emit_pub(self):
"""Basic sanity check on the code emitters, from public key."""
pubname = self.tname("public.pem")
for key_size in RSA_KEY_SIZES:
k = RSA.generate(key_size=key_size)
k.export_public(pubname)
k2 = load(pubname)
ccode = io.StringIO()
k2.emit_c_public(ccode)
self.assertIn("rsa_pub_key", ccode.getvalue())
self.assertIn("rsa_pub_key_len", ccode.getvalue())
rustcode = io.StringIO()
k2.emit_rust_public(rustcode)
self.assertIn("RSA_PUB_KEY", rustcode.getvalue())
def test_sig(self):
for key_size in RSA_KEY_SIZES:
k = RSA.generate(key_size=key_size)
buf = b'This is the message'
sig = k.sign(buf)
# The code doesn't have any verification, so verify this
# manually.
k.key.public_key().verify(
signature=sig,
data=buf,
padding=PSS(mgf=MGF1(SHA256()), salt_length=32),
algorithm=SHA256())
# Modify the message to make sure the signature fails.
self.assertRaises(InvalidSignature,
k.key.public_key().verify,
signature=sig,
data=b'This is thE message',
padding=PSS(mgf=MGF1(SHA256()), salt_length=32),
algorithm=SHA256())
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,119 @@
"""
X25519 key management
"""
# SPDX-License-Identifier: Apache-2.0
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import x25519
from .general import KeyClass
from .privatebytes import PrivateBytesMixin
class X25519UsageError(Exception):
pass
class X25519Public(KeyClass):
def __init__(self, key):
self.key = key
def shortname(self):
return "x25519"
def _unsupported(self, name):
raise X25519UsageError("Operation {} requires private key".format(name))
def _get_public(self):
return self.key
def get_public_bytes(self):
# The key is embedded into MBUboot in "SubjectPublicKeyInfo" format
return self._get_public().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
def get_public_pem(self):
return self._get_public().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
def get_private_bytes(self, minimal, format):
self._unsupported('get_private_bytes')
def export_private(self, path, passwd=None):
self._unsupported('export_private')
def export_public(self, path):
"""Write the public key to the given file."""
pem = self._get_public().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
with open(path, 'wb') as f:
f.write(pem)
def sig_type(self):
return "X25519"
def sig_tlv(self):
return "X25519"
def sig_len(self):
return 32
class X25519(X25519Public, PrivateBytesMixin):
"""
Wrapper around an X25519 private key.
"""
def __init__(self, key):
"""key should be an instance of EllipticCurvePrivateKey"""
self.key = key
@staticmethod
def generate():
pk = x25519.X25519PrivateKey.generate()
return X25519(pk)
def _get_public(self):
return self.key.public_key()
_VALID_FORMATS = {
'pkcs8': serialization.PrivateFormat.PKCS8
}
_DEFAULT_FORMAT = 'pkcs8'
def get_private_bytes(self, minimal, format):
_, priv = self._get_private_bytes(minimal, format,
X25519UsageError)
return priv
def export_private(self, path, passwd=None):
"""
Write the private key to the given file, protecting it with the
optional password.
"""
if passwd is None:
enc = serialization.NoEncryption()
else:
enc = serialization.BestAvailableEncryption(passwd)
pem = self.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=enc)
with open(path, 'wb') as f:
f.write(pem)
def sign_digest(self, digest):
"""Return the actual signature"""
return self.key.sign(data=digest)
def verify_digest(self, signature, digest):
"""Verify that signature is valid for given digest"""
k = self.key
if isinstance(self.key, x25519.X25519PrivateKey):
k = self.key.public_key()
return k.verify(signature=signature, data=digest)

View File

@@ -0,0 +1,618 @@
#! /usr/bin/env python3
#
# Copyright 2017-2020 Linaro Limited
# Copyright 2019-2023 Arm Limited
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import click
import getpass
import imgtool.keys as keys
import sys
import struct
import os
import lzma
import hashlib
import base64
from imgtool import image, imgtool_version
from imgtool.version import decode_version
from imgtool.dumpinfo import dump_imginfo
from .keys import (
RSAUsageError, ECDSAUsageError, Ed25519UsageError, X25519UsageError)
comp_default_dictsize=131072
comp_default_pb=2
comp_default_lc=3
comp_default_lp=1
comp_default_preset=9
MIN_PYTHON_VERSION = (3, 6)
if sys.version_info < MIN_PYTHON_VERSION:
sys.exit("Python %s.%s or newer is required by imgtool."
% MIN_PYTHON_VERSION)
def gen_rsa2048(keyfile, passwd):
keys.RSA.generate().export_private(path=keyfile, passwd=passwd)
def gen_rsa3072(keyfile, passwd):
keys.RSA.generate(key_size=3072).export_private(path=keyfile,
passwd=passwd)
def gen_ecdsa_p256(keyfile, passwd):
keys.ECDSA256P1.generate().export_private(keyfile, passwd=passwd)
def gen_ecdsa_p384(keyfile, passwd):
keys.ECDSA384P1.generate().export_private(keyfile, passwd=passwd)
def gen_ed25519(keyfile, passwd):
keys.Ed25519.generate().export_private(path=keyfile, passwd=passwd)
def gen_x25519(keyfile, passwd):
keys.X25519.generate().export_private(path=keyfile, passwd=passwd)
valid_langs = ['c', 'rust']
valid_hash_encodings = ['lang-c', 'raw']
valid_encodings = ['lang-c', 'lang-rust', 'pem', 'raw']
keygens = {
'rsa-2048': gen_rsa2048,
'rsa-3072': gen_rsa3072,
'ecdsa-p256': gen_ecdsa_p256,
'ecdsa-p384': gen_ecdsa_p384,
'ed25519': gen_ed25519,
'x25519': gen_x25519,
}
valid_formats = ['openssl', 'pkcs8']
valid_sha = [ 'auto', '256', '384', '512' ]
def load_signature(sigfile):
with open(sigfile, 'rb') as f:
signature = base64.b64decode(f.read())
return signature
def save_signature(sigfile, sig):
with open(sigfile, 'wb') as f:
signature = base64.b64encode(sig)
f.write(signature)
def load_key(keyfile):
# TODO: better handling of invalid pass-phrase
key = keys.load(keyfile)
if key is not None:
return key
passwd = getpass.getpass("Enter key passphrase: ").encode('utf-8')
return keys.load(keyfile, passwd)
def get_password():
while True:
passwd = getpass.getpass("Enter key passphrase: ")
passwd2 = getpass.getpass("Reenter passphrase: ")
if passwd == passwd2:
break
print("Passwords do not match, try again")
# Password must be bytes, always use UTF-8 for consistent
# encoding.
return passwd.encode('utf-8')
@click.option('-p', '--password', is_flag=True,
help='Prompt for password to protect key')
@click.option('-t', '--type', metavar='type', required=True,
type=click.Choice(keygens.keys()), prompt=True,
help='{}'.format('One of: {}'.format(', '.join(keygens.keys()))))
@click.option('-k', '--key', metavar='filename', required=True)
@click.command(help='Generate pub/private keypair')
def keygen(type, key, password):
password = get_password() if password else None
keygens[type](key, password)
@click.option('-l', '--lang', metavar='lang',
type=click.Choice(valid_langs),
help='This option is deprecated. Please use the '
'`--encoding` option. '
'Valid langs: {}'.format(', '.join(valid_langs)))
@click.option('-e', '--encoding', metavar='encoding',
type=click.Choice(valid_encodings),
help='Valid encodings: {}'.format(', '.join(valid_encodings)))
@click.option('-k', '--key', metavar='filename', required=True)
@click.option('-o', '--output', metavar='output', required=False,
help='Specify the output file\'s name. \
The stdout is used if it is not provided.')
@click.command(help='Dump public key from keypair')
def getpub(key, encoding, lang, output):
if encoding and lang:
raise click.UsageError('Please use only one of `--encoding/-e` '
'or `--lang/-l`')
elif not encoding and not lang:
# Preserve old behavior defaulting to `c`. If `lang` is removed,
# `default=valid_encodings[0]` should be added to `-e` param.
lang = valid_langs[0]
key = load_key(key)
if not output:
output = sys.stdout
if key is None:
print("Invalid passphrase")
elif lang == 'c' or encoding == 'lang-c':
key.emit_c_public(file=output)
elif lang == 'rust' or encoding == 'lang-rust':
key.emit_rust_public(file=output)
elif encoding == 'pem':
key.emit_public_pem(file=output)
elif encoding == 'raw':
key.emit_raw_public(file=output)
else:
raise click.UsageError()
@click.option('-e', '--encoding', metavar='encoding',
type=click.Choice(valid_hash_encodings),
help='Valid encodings: {}. '
'Default value is {}.'
.format(', '.join(valid_hash_encodings),
valid_hash_encodings[0]))
@click.option('-k', '--key', metavar='filename', required=True)
@click.option('-o', '--output', metavar='output', required=False,
help='Specify the output file\'s name. \
The stdout is used if it is not provided.')
@click.command(help='Dump the SHA256 hash of the public key')
def getpubhash(key, output, encoding):
if not encoding:
encoding = valid_hash_encodings[0]
key = load_key(key)
if not output:
output = sys.stdout
if key is None:
print("Invalid passphrase")
elif encoding == 'lang-c':
key.emit_c_public_hash(file=output)
elif encoding == 'raw':
key.emit_raw_public_hash(file=output)
else:
raise click.UsageError()
@click.option('--minimal', default=False, is_flag=True,
help='Reduce the size of the dumped private key to include only '
'the minimum amount of data required to decrypt. This '
'might require changes to the build config. Check the docs!'
)
@click.option('-k', '--key', metavar='filename', required=True)
@click.option('-f', '--format',
type=click.Choice(valid_formats),
help='Valid formats: {}'.format(', '.join(valid_formats))
)
@click.command(help='Dump private key from keypair')
def getpriv(key, minimal, format):
key = load_key(key)
if key is None:
print("Invalid passphrase")
try:
key.emit_private(minimal, format)
except (RSAUsageError, ECDSAUsageError, Ed25519UsageError,
X25519UsageError) as e:
raise click.UsageError(e)
@click.argument('imgfile')
@click.option('-k', '--key', metavar='filename')
@click.command(help="Check that signed image can be verified by given key")
def verify(key, imgfile):
key = load_key(key) if key else None
ret, version, digest, signature = image.Image.verify(imgfile, key)
if ret == image.VerifyResult.OK:
print("Image was correctly validated")
print("Image version: {}.{}.{}+{}".format(*version))
if digest:
print("Image digest: {}".format(digest.hex()))
if signature and digest is None:
print("Image signature over image: {}".format(signature.hex()))
return
elif ret == image.VerifyResult.INVALID_MAGIC:
print("Invalid image magic; is this an MCUboot image?")
elif ret == image.VerifyResult.INVALID_TLV_INFO_MAGIC:
print("Invalid TLV info magic; is this an MCUboot image?")
elif ret == image.VerifyResult.INVALID_HASH:
print("Image has an invalid hash")
elif ret == image.VerifyResult.INVALID_SIGNATURE:
print("No signature found for the given key")
elif ret == image.VerifyResult.KEY_MISMATCH:
print("Key type does not match TLV record")
else:
print("Unknown return code: {}".format(ret))
sys.exit(1)
@click.argument('imgfile')
@click.option('-o', '--outfile', metavar='filename', required=False,
help='Save image information to outfile in YAML format')
@click.option('-s', '--silent', default=False, is_flag=True,
help='Do not print image information to output')
@click.command(help='Print header, TLV area and trailer information '
'of a signed image')
def dumpinfo(imgfile, outfile, silent):
dump_imginfo(imgfile, outfile, silent)
print("dumpinfo has run successfully")
def validate_version(ctx, param, value):
try:
decode_version(value)
return value
except ValueError as e:
raise click.BadParameter("{}".format(e))
def validate_security_counter(ctx, param, value):
if value is not None:
if value.lower() == 'auto':
return 'auto'
else:
try:
return int(value, 0)
except ValueError:
raise click.BadParameter(
"{} is not a valid integer. Please use code literals "
"prefixed with 0b/0B, 0o/0O, or 0x/0X as necessary."
.format(value))
def validate_header_size(ctx, param, value):
min_hdr_size = image.IMAGE_HEADER_SIZE
if value < min_hdr_size:
raise click.BadParameter(
"Minimum value for -H/--header-size is {}".format(min_hdr_size))
return value
def get_dependencies(ctx, param, value):
if value is not None:
versions = []
images = re.findall(r"\((\d+)", value)
if len(images) == 0:
raise click.BadParameter(
"Image dependency format is invalid: {}".format(value))
raw_versions = re.findall(r",\s*([0-9.+]+)\)", value)
if len(images) != len(raw_versions):
raise click.BadParameter(
'''There's a mismatch between the number of dependency images
and versions in: {}'''.format(value))
for raw_version in raw_versions:
try:
versions.append(decode_version(raw_version))
except ValueError as e:
raise click.BadParameter("{}".format(e))
dependencies = dict()
dependencies[image.DEP_IMAGES_KEY] = images
dependencies[image.DEP_VERSIONS_KEY] = versions
return dependencies
def create_lzma2_header(dictsize, pb, lc, lp):
header = bytearray()
for i in range(0, 40):
if dictsize <= ((2 | ((i) & 1)) << int((i) / 2 + 11)):
header.append(i)
break
header.append( ( pb * 5 + lp) * 9 + lc)
return header
class BasedIntParamType(click.ParamType):
name = 'integer'
def convert(self, value, param, ctx):
try:
return int(value, 0)
except ValueError:
self.fail('%s is not a valid integer. Please use code literals '
'prefixed with 0b/0B, 0o/0O, or 0x/0X as necessary.'
% value, param, ctx)
@click.argument('outfile')
@click.argument('infile')
@click.option('--non-bootable', default=False, is_flag=True,
help='Mark the image as non-bootable.')
@click.option('--custom-tlv', required=False, nargs=2, default=[],
multiple=True, metavar='[tag] [value]',
help='Custom TLV that will be placed into protected area. '
'Add "0x" prefix if the value should be interpreted as an '
'integer, otherwise it will be interpreted as a string. '
'Specify the option multiple times to add multiple TLVs.')
@click.option('-R', '--erased-val', type=click.Choice(['0', '0xff']),
required=False,
help='The value that is read back from erased flash.')
@click.option('-x', '--hex-addr', type=BasedIntParamType(), required=False,
help='Adjust address in hex output file.')
@click.option('-L', '--load-addr', type=BasedIntParamType(), required=False,
help='Load address for image when it should run from RAM.')
@click.option('-F', '--rom-fixed', type=BasedIntParamType(), required=False,
help='Set flash address the image is built for.')
@click.option('--save-enctlv', default=False, is_flag=True,
help='When upgrading, save encrypted key TLVs instead of plain '
'keys. Enable when BOOT_SWAP_SAVE_ENCTLV config option '
'was set.')
@click.option('-E', '--encrypt', metavar='filename',
help='Encrypt image using the provided public key. '
'(Not supported in direct-xip or ram-load mode.)')
@click.option('--encrypt-keylen', default='128',
type=click.Choice(['128', '256']),
help='When encrypting the image using AES, select a 128 bit or '
'256 bit key len.')
@click.option('--compression', default='disabled',
type=click.Choice(['disabled', 'lzma2', 'lzma2armthumb']),
help='Enable image compression using specified type. '
'Will fall back without image compression automatically '
'if the compression increases the image size.')
@click.option('-c', '--clear', required=False, is_flag=True, default=False,
help='Output a non-encrypted image with encryption capabilities,'
'so it can be installed in the primary slot, and encrypted '
'when swapped to the secondary.')
@click.option('-e', '--endian', type=click.Choice(['little', 'big']),
default='little', help="Select little or big endian")
@click.option('--overwrite-only', default=False, is_flag=True,
help='Use overwrite-only instead of swap upgrades')
@click.option('--boot-record', metavar='sw_type', help='Create CBOR encoded '
'boot record TLV. The sw_type represents the role of the '
'software component (e.g. CoFM for coprocessor firmware). '
'[max. 12 characters]')
@click.option('-M', '--max-sectors', type=int,
help='When padding allow for this amount of sectors (defaults '
'to 128)')
@click.option('--confirm', default=False, is_flag=True,
help='When padding the image, mark it as confirmed (implies '
'--pad)')
@click.option('--pad', default=False, is_flag=True,
help='Pad image to --slot-size bytes, adding trailer magic')
@click.option('-S', '--slot-size', type=BasedIntParamType(), required=True,
help='Size of the slot. If the slots have different sizes, use '
'the size of the secondary slot.')
@click.option('--pad-header', default=False, is_flag=True,
help='Add --header-size zeroed bytes at the beginning of the '
'image')
@click.option('-H', '--header-size', callback=validate_header_size,
type=BasedIntParamType(), required=True)
@click.option('--pad-sig', default=False, is_flag=True,
help='Add 0-2 bytes of padding to ECDSA signature '
'(for mcuboot <1.5)')
@click.option('-d', '--dependencies', callback=get_dependencies,
required=False, help='''Add dependence on another image, format:
"(<image_ID>,<image_version>), ... "''')
@click.option('-s', '--security-counter', callback=validate_security_counter,
help='Specify the value of security counter. Use the `auto` '
'keyword to automatically generate it from the image version.')
@click.option('-v', '--version', callback=validate_version, required=True)
@click.option('--align', type=click.Choice(['1', '2', '4', '8', '16', '32']),
default='1',
required=False,
help='Alignment used by swap update modes.')
@click.option('--max-align', type=click.Choice(['8', '16', '32']),
required=False,
help='Maximum flash alignment. Set if flash alignment of the '
'primary and secondary slot differ and any of them is larger '
'than 8.')
@click.option('--public-key-format', type=click.Choice(['hash', 'full']),
default='hash', help='In what format to add the public key to '
'the image manifest: full key or hash of the key.')
@click.option('-k', '--key', metavar='filename')
@click.option('--fix-sig', metavar='filename',
help='fixed signature for the image. It will be used instead of '
'the signature calculated using the public key')
@click.option('--fix-sig-pubkey', metavar='filename',
help='public key relevant to fixed signature')
@click.option('--pure', 'is_pure', is_flag=True, default=False, show_default=True,
help='Expected Pure variant of signature; the Pure variant is '
'expected to be signature done over an image rather than hash of '
'that image.')
@click.option('--sig-out', metavar='filename',
help='Path to the file to which signature will be written. '
'The image signature will be encoded as base64 formatted string')
@click.option('--sha', 'user_sha', type=click.Choice(valid_sha), default='auto',
help='selected sha algorithm to use; defaults to "auto" which is 256 if '
'no cryptographic signature is used, or default for signature type')
@click.option('--vector-to-sign', type=click.Choice(['payload', 'digest']),
help='send to OUTFILE the payload or payload''s digest instead '
'of complied image. These data can be used for external image '
'signing')
@click.command(help='''Create a signed or unsigned image\n
INFILE and OUTFILE are parsed as Intel HEX if the params have
.hex extension, otherwise binary format is used''')
def sign(key, public_key_format, align, version, pad_sig, header_size,
pad_header, slot_size, pad, confirm, max_sectors, overwrite_only,
endian, encrypt_keylen, encrypt, compression, infile, outfile,
dependencies, load_addr, hex_addr, erased_val, save_enctlv,
security_counter, boot_record, custom_tlv, rom_fixed, max_align,
clear, fix_sig, fix_sig_pubkey, sig_out, user_sha, is_pure,
vector_to_sign, non_bootable):
if confirm:
# Confirmed but non-padded images don't make much sense, because
# otherwise there's no trailer area for writing the confirmed status.
pad = True
img = image.Image(version=decode_version(version), header_size=header_size,
pad_header=pad_header, pad=pad, confirm=confirm,
align=int(align), slot_size=slot_size,
max_sectors=max_sectors, overwrite_only=overwrite_only,
endian=endian, load_addr=load_addr, rom_fixed=rom_fixed,
erased_val=erased_val, save_enctlv=save_enctlv,
security_counter=security_counter, max_align=max_align,
non_bootable=non_bootable)
compression_tlvs = {}
img.load(infile)
key = load_key(key) if key else None
enckey = load_key(encrypt) if encrypt else None
if enckey and key:
if ((isinstance(key, keys.ECDSA256P1) and
not isinstance(enckey, keys.ECDSA256P1Public))
or (isinstance(key, keys.ECDSA384P1) and
not isinstance(enckey, keys.ECDSA384P1Public))
or (isinstance(key, keys.RSA) and
not isinstance(enckey, keys.RSAPublic))):
# FIXME
raise click.UsageError("Signing and encryption must use the same "
"type of key")
if pad_sig and hasattr(key, 'pad_sig'):
key.pad_sig = True
# Get list of custom protected TLVs from the command-line
custom_tlvs = {}
for tlv in custom_tlv:
tag = int(tlv[0], 0)
if tag in custom_tlvs:
raise click.UsageError('Custom TLV %s already exists.' % hex(tag))
if tag in image.TLV_VALUES.values():
raise click.UsageError(
'Custom TLV %s conflicts with predefined TLV.' % hex(tag))
value = tlv[1]
if value.startswith('0x'):
if len(value[2:]) % 2:
raise click.UsageError('Custom TLV length is odd.')
custom_tlvs[tag] = bytes.fromhex(value[2:])
else:
custom_tlvs[tag] = value.encode('utf-8')
# Allow signature calculated externally.
raw_signature = load_signature(fix_sig) if fix_sig else None
baked_signature = None
pub_key = None
if raw_signature is not None:
if fix_sig_pubkey is None:
raise click.UsageError(
'public key of the fixed signature is not specified')
pub_key = load_key(fix_sig_pubkey)
baked_signature = {
'value': raw_signature
}
if is_pure and user_sha != 'auto':
raise click.UsageError(
'Pure signatures, currently, enforces preferred hash algorithm, '
'and forbids sha selection by user.')
img.create(key, public_key_format, enckey, dependencies, boot_record,
custom_tlvs, compression_tlvs, None, int(encrypt_keylen), clear,
baked_signature, pub_key, vector_to_sign, user_sha=user_sha,
is_pure=is_pure)
if compression in ["lzma2", "lzma2armthumb"]:
compressed_img = image.Image(version=decode_version(version),
header_size=header_size, pad_header=pad_header,
pad=pad, confirm=confirm, align=int(align),
slot_size=slot_size, max_sectors=max_sectors,
overwrite_only=overwrite_only, endian=endian,
load_addr=load_addr, rom_fixed=rom_fixed,
erased_val=erased_val, save_enctlv=save_enctlv,
security_counter=security_counter, max_align=max_align)
compression_filters = [
{"id": lzma.FILTER_LZMA2, "preset": comp_default_preset,
"dict_size": comp_default_dictsize, "lp": comp_default_lp,
"lc": comp_default_lc}
]
if compression == "lzma2armthumb":
compression_filters.insert(0, {"id":lzma.FILTER_ARMTHUMB})
compressed_data = lzma.compress(img.get_infile_data(),filters=compression_filters,
format=lzma.FORMAT_RAW)
uncompressed_size = len(img.get_infile_data())
compressed_size = len(compressed_data)
print(f"compressed image size: {compressed_size} bytes")
print(f"original image size: {uncompressed_size} bytes")
compression_tlvs["DECOMP_SIZE"] = struct.pack(
img.get_struct_endian() + 'L', img.image_size)
compression_tlvs["DECOMP_SHA"] = img.image_hash
compression_tlvs_size = len(compression_tlvs["DECOMP_SIZE"])
compression_tlvs_size += len(compression_tlvs["DECOMP_SHA"])
if img.get_signature():
compression_tlvs["DECOMP_SIGNATURE"] = img.get_signature()
compression_tlvs_size += len(compression_tlvs["DECOMP_SIGNATURE"])
if (compressed_size + compression_tlvs_size) < uncompressed_size:
compression_header = create_lzma2_header(
dictsize = comp_default_dictsize, pb = comp_default_pb,
lc = comp_default_lc, lp = comp_default_lp)
compressed_img.load_compressed(compressed_data, compression_header)
compressed_img.base_addr = img.base_addr
compressed_img.create(key, public_key_format, enckey,
dependencies, boot_record, custom_tlvs, compression_tlvs,
compression, int(encrypt_keylen), clear, baked_signature,
pub_key, vector_to_sign)
img = compressed_img
img.save(outfile, hex_addr)
if sig_out is not None:
new_signature = img.get_signature()
save_signature(sig_out, new_signature)
class AliasesGroup(click.Group):
_aliases = {
"create": "sign",
}
def list_commands(self, ctx):
cmds = [k for k in self.commands]
aliases = [k for k in self._aliases]
return sorted(cmds + aliases)
def get_command(self, ctx, cmd_name):
rv = click.Group.get_command(self, ctx, cmd_name)
if rv is not None:
return rv
if cmd_name in self._aliases:
return click.Group.get_command(self, ctx, self._aliases[cmd_name])
return None
@click.command(help='Print imgtool version information')
def version():
print(imgtool_version)
@click.command(cls=AliasesGroup,
context_settings=dict(help_option_names=['-h', '--help']))
def imgtool():
pass
imgtool.add_command(keygen)
imgtool.add_command(getpub)
imgtool.add_command(getpubhash)
imgtool.add_command(getpriv)
imgtool.add_command(verify)
imgtool.add_command(sign)
imgtool.add_command(version)
imgtool.add_command(dumpinfo)
if __name__ == '__main__':
imgtool()

View File

@@ -0,0 +1,56 @@
# Copyright 2017 Linaro Limited
# Copyright 2024 Arm Limited
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Semi Semantic Versioning
Implements a subset of semantic versioning that is supportable by the image
header.
"""
import re
import sys
from collections import namedtuple
SemiSemVersion = namedtuple('SemiSemVersion', ['major', 'minor', 'revision',
'build'])
version_re = re.compile(
r"""^([1-9]\d*|0)(\.([1-9]\d*|0)(\.([1-9]\d*|0)(\+([1-9]\d*|0))?)?)?$""")
def decode_version(text):
"""Decode the version string, which should be of the form maj.min.rev+build
"""
m = version_re.match(text)
if m:
result = SemiSemVersion(
int(m.group(1)) if m.group(1) else 0,
int(m.group(3)) if m.group(3) else 0,
int(m.group(5)) if m.group(5) else 0,
int(m.group(7)) if m.group(7) else 0)
return result
else:
msg = "Invalid version number, should be maj.min.rev+build with later "
msg += "parts optional"
raise ValueError(msg)
if __name__ == '__main__':
if len(sys.argv) > 1:
print(decode_version(sys.argv[1]))
else:
print("Requires an argument, e.g. '1.0.0'")

View File

@@ -0,0 +1,8 @@
#! /bin/bash
#
# SPDX-License-Identifier: Apache-2.0
source $(dirname $0)/../target.sh
# Start the jlink gdb server
JLinkGDBServer -if swd -device $SOC -speed auto

View File

@@ -0,0 +1,7 @@
#!/bin/bash
#
# SPDX-License-Identifier: Apache-2.0
source $(dirname $0)/../target.sh
JLinkExe -speed auto -si SWD -device $SOC

View File

@@ -0,0 +1,135 @@
// Copyright (C) 2019, Linaro Ltd
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is a Binary Template file for the 010 Editor
// (http://www.sweetscape.com/010editor/) to allow it to show the
// structure of an MCUboot image.
LittleEndian();
struct ENTRY {
uint32 id;
uint32 offset;
uint32 size;
uint32 pad;
};
// The simulator writes the partition table at the beginning of the
// image, so that we can tell where the partitions are. If you are
// trying to view an image captured from a device, you can either
// construct a synthetic partition table in the file, or change code
// described below to hardcode one.
struct PTABLE {
uchar pheader[8];
if (ptable.pheader != "mcuboot\0") {
// NOTE: Put code here to hard code a partition table, and
// continue.
Warning("Invalid magic on ptable header");
return -1;
} else {
uint32 count;
struct ENTRY entries[count];
}
};
struct PTABLE ptable;
struct IMAGE_VERSION {
uchar major;
uchar minor;
uint16 revision;
uint32 build_num;
};
struct IHDR {
uint32 magic <format=hex>;
uint32 load_addr <format=hex>;
uint16 hdr_size <format=hex>;
uint16 protect_size <format=hex>;
uint32 img_size <format=hex>;
uint32 flags;
struct IMAGE_VERSION ver;
uint32 _pad1;
};
struct TLV_HDR {
uint16 magic;
uint16 tlv_tot;
};
struct TLV {
uchar type <format=hex>;
uchar pad;
uint16 len;
switch (type) {
case 0x01: // keyhash
uchar keyhash[len];
break;
case 0x40: // dependency
if (len != 12) {
Warning("Invalid dependency size");
return -1;
}
uchar image_id;
uchar pad1;
uint16 pad2;
struct IMAGE_VERSION version;
break;
default:
// Other, just consume the data.
uchar data[len];
}
};
local int i;
local int epos;
for (i = 0; i < ptable.count; i++) {
FSeek(ptable.entries[i].offset);
switch (ptable.entries[i].id) {
case 1:
case 2:
case 4:
case 5:
struct IMAGE {
struct IHDR ihdr;
if (ihdr.magic == 0x96f3b83d) {
uchar payload[ihdr.img_size];
epos = FTell();
struct TLV_HDR tlv_hdr;
if (tlv_hdr.magic == 0x6907) {
epos += tlv_hdr.tlv_tot;
while (FTell() < epos) {
struct TLV tlv;
}
}
}
// uchar block[ptable.entries[i].size];
} image;
break;
case 3:
struct SCRATCH {
uchar data[ptable.entries[i].size];
} scratch;
break;
default:
break;
}
}

View File

@@ -0,0 +1,7 @@
cryptography>=40.0.0
intelhex
click
cbor2
setuptools
pyyaml
pytest

View File

@@ -0,0 +1,33 @@
# SPDX-License-Identifier: Apache-2.0
import setuptools
from imgtool import imgtool_version
setuptools.setup(
name="imgtool",
version=imgtool_version,
author="The MCUboot committers",
author_email="mcuboot@groups.io",
description=("MCUboot's image signing and key management"),
license="Apache Software License",
url="http://github.com/mcu-tools/mcuboot",
packages=setuptools.find_packages(),
python_requires='>=3.6',
install_requires=[
'cryptography>=40.0.0',
'intelhex>=2.2.1',
'click',
'cbor2',
'pyyaml',
],
entry_points={
"console_scripts": ["imgtool=imgtool.main:imgtool"]
},
classifiers=[
"Programming Language :: Python :: 3",
"Development Status :: 4 - Beta",
"Topic :: Software Development :: Build Tools",
"License :: OSI Approved :: Apache Software License",
],
)

View File

@@ -0,0 +1,31 @@
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
# List of tests expected to fail for some reason
XFAILED_TESTS = {
"tests/test_keys.py::test_getpriv[openssl-ed25519]",
"tests/test_keys.py::test_getpriv[openssl-x25519]",
"tests/test_keys.py::test_getpriv[pkcs8-rsa-2048]",
"tests/test_keys.py::test_getpriv[pkcs8-rsa-3072]",
"tests/test_keys.py::test_getpriv[pkcs8-ed25519]",
"tests/test_keys.py::test_getpub[pem-ed25519]",
"tests/test_keys.py::test_sign_verify[x25519]",
}
def pytest_runtest_setup(item):
if item.nodeid in XFAILED_TESTS:
pytest.xfail()

View File

@@ -0,0 +1,112 @@
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from click.testing import CliRunner
from imgtool.main import imgtool
from imgtool import imgtool_version
# all available imgtool commands
COMMANDS = [
"create",
"dumpinfo",
"getpriv",
"getpub",
"getpubhash",
"keygen",
"sign",
"verify",
"version",
]
def test_new_command():
"""Check that no new commands had been added,
so that tests would be updated in such case"""
for cmd in imgtool.commands:
assert cmd in COMMANDS
def test_help():
"""Simple test for the imgtool's help option,
mostly just to see that it can be started"""
runner = CliRunner()
result_short = runner.invoke(imgtool, ["-h"])
assert result_short.exit_code == 0
result_long = runner.invoke(imgtool, ["--help"])
assert result_long.exit_code == 0
assert result_short.output == result_long.output
# by default help should be also produced
result_empty = runner.invoke(imgtool)
assert result_empty.exit_code == 0
assert result_empty.output == result_short.output
def test_version():
"""Check that some version info is produced"""
runner = CliRunner()
result = runner.invoke(imgtool, ["version"])
assert result.exit_code == 0
assert result.output == imgtool_version + "\n"
result_help = runner.invoke(imgtool, ["version", "-h"])
assert result_help.exit_code == 0
assert result_help.output != result.output
def test_unknown():
"""Check that unknown command will be handled"""
runner = CliRunner()
result = runner.invoke(imgtool, ["unknown"])
assert result.exit_code != 0
@pytest.mark.parametrize("command", COMMANDS)
def test_cmd_help(command):
"""Check that all commands have some help"""
runner = CliRunner()
result_short = runner.invoke(imgtool, [command, "-h"])
assert result_short.exit_code == 0
result_long = runner.invoke(imgtool, [command, "--help"])
assert result_long.exit_code == 0
assert result_short.output == result_long.output
@pytest.mark.parametrize("command1", COMMANDS)
@pytest.mark.parametrize("command2", COMMANDS)
def test_cmd_dif_help(command1, command2):
"""Check that all commands have some different help"""
runner = CliRunner()
result_general = runner.invoke(imgtool, "--help")
assert result_general.exit_code == 0
result_cmd1 = runner.invoke(imgtool, [command1, "--help"])
assert result_cmd1.exit_code == 0
assert result_cmd1.output != result_general.output
if command1 != command2:
result_cmd2 = runner.invoke(imgtool, [command2, "--help"])
assert result_cmd2.exit_code == 0
assert result_cmd1.output != result_cmd2.output

View File

@@ -0,0 +1,253 @@
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
import subprocess
from click.testing import CliRunner
from imgtool import main as imgtool_main
from imgtool.main import imgtool
# all supported key types for 'keygen'
KEY_TYPES = [*imgtool_main.keygens]
KEY_ENCODINGS = [*imgtool_main.valid_encodings]
PUB_HASH_ENCODINGS = [*imgtool_main.valid_hash_encodings]
PVT_KEY_FORMATS = [*imgtool_main.valid_formats]
OPENSSL_KEY_TYPES = {
"rsa-2048": "Private-Key: (2048 bit, 2 primes)",
"rsa-3072": "Private-Key: (3072 bit, 2 primes)",
"ecdsa-p256": "Private-Key: (256 bit)",
"ecdsa-p384": "Private-Key: (384 bit)",
"ed25519": "ED25519 Private-Key:",
"x25519": "X25519 Private-Key:",
}
GEN_KEY_EXT = ".key"
GEN_ANOTHER_KEY_EXT = ".another.key"
PUB_KEY_EXT = ".pub"
PUB_KEY_HASH_EXT = ".pubhash"
def tmp_name(tmp_path, key_type, suffix=""):
return tmp_path / (key_type + suffix)
@pytest.fixture(scope="session")
def tmp_path_persistent(tmp_path_factory):
return tmp_path_factory.mktemp("keys")
@pytest.mark.parametrize("key_type", KEY_TYPES)
def test_keygen(key_type, tmp_path_persistent):
"""Generate keys by imgtool"""
runner = CliRunner()
gen_key = tmp_name(tmp_path_persistent, key_type, GEN_KEY_EXT)
assert not gen_key.exists()
result = runner.invoke(
imgtool, ["keygen", "--key", str(gen_key), "--type", key_type]
)
assert result.exit_code == 0
assert gen_key.exists()
assert gen_key.stat().st_size > 0
# another key
gen_key2 = tmp_name(tmp_path_persistent, key_type, GEN_ANOTHER_KEY_EXT)
assert str(gen_key2) != str(gen_key)
assert not gen_key2.exists()
result = runner.invoke(
imgtool, ["keygen", "--key", str(gen_key2), "--type", key_type]
)
assert result.exit_code == 0
assert gen_key2.exists()
assert gen_key2.stat().st_size > 0
# content must be different
assert gen_key.read_bytes() != gen_key2.read_bytes()
@pytest.mark.parametrize("key_type", KEY_TYPES)
def test_keygen_type(key_type, tmp_path_persistent):
"""Check generated keys"""
assert key_type in OPENSSL_KEY_TYPES
gen_key = tmp_name(tmp_path_persistent, key_type, GEN_KEY_EXT)
result = subprocess.run(
["openssl", "pkey", "-in", str(gen_key), "-check", "-noout", "-text"],
capture_output=True,
text=True,
)
assert result.returncode == 0
assert "Key is valid" in result.stdout
assert OPENSSL_KEY_TYPES[key_type] in result.stdout
@pytest.mark.parametrize("key_type", KEY_TYPES)
@pytest.mark.parametrize("format", PVT_KEY_FORMATS)
def test_getpriv(key_type, format, tmp_path_persistent):
"""Get private key"""
runner = CliRunner()
gen_key = tmp_name(tmp_path_persistent, key_type, GEN_KEY_EXT)
result = runner.invoke(
imgtool,
[
"getpriv",
"--key",
str(gen_key),
"--format",
format,
],
)
assert result.exit_code == 0
@pytest.mark.parametrize("key_type", KEY_TYPES)
@pytest.mark.parametrize("encoding", KEY_ENCODINGS)
def test_getpub(key_type, encoding, tmp_path_persistent):
"""Get public key"""
runner = CliRunner()
gen_key = tmp_name(tmp_path_persistent, key_type, GEN_KEY_EXT)
pub_key = tmp_name(tmp_path_persistent, key_type, PUB_KEY_EXT
+ "." + encoding)
assert not pub_key.exists()
result = runner.invoke(
imgtool,
[
"getpub",
"--key",
str(gen_key),
"--output",
str(pub_key),
"--encoding",
encoding,
],
)
assert result.exit_code == 0
assert pub_key.exists()
assert pub_key.stat().st_size > 0
@pytest.mark.parametrize("key_type", KEY_TYPES)
@pytest.mark.parametrize("encoding", PUB_HASH_ENCODINGS)
def test_getpubhash(key_type, encoding, tmp_path_persistent):
"""Get the hash of the public key"""
runner = CliRunner()
gen_key = tmp_name(tmp_path_persistent, key_type, GEN_KEY_EXT)
pub_key_hash = tmp_name(
tmp_path_persistent, key_type, PUB_KEY_HASH_EXT + "." + encoding
)
assert not pub_key_hash.exists()
result = runner.invoke(
imgtool,
[
"getpubhash",
"--key",
str(gen_key),
"--output",
str(pub_key_hash),
"--encoding",
encoding,
],
)
assert result.exit_code == 0
assert pub_key_hash.exists()
assert pub_key_hash.stat().st_size > 0
@pytest.mark.parametrize("key_type", KEY_TYPES)
def test_sign_verify(key_type, tmp_path_persistent):
"""Test basic sign and verify"""
runner = CliRunner()
gen_key = tmp_name(tmp_path_persistent, key_type, GEN_KEY_EXT)
wrong_key = tmp_name(tmp_path_persistent, key_type, GEN_ANOTHER_KEY_EXT)
image = tmp_name(tmp_path_persistent, "image", "bin")
image_signed = tmp_name(tmp_path_persistent, "image", "signed")
with image.open("wb") as f:
f.write(b"\x00" * 1024)
# not all required arguments are provided
result = runner.invoke(
imgtool,
[
"sign",
"--key",
str(gen_key),
str(image),
str(image_signed),
],
)
assert result.exit_code != 0
assert not image_signed.exists()
result = runner.invoke(
imgtool,
[
"sign",
"--key",
str(gen_key),
"--align",
"16",
"--version",
"1.0.0",
"--header-size",
"0x400",
"--slot-size",
"0x10000",
"--pad-header",
str(image),
str(image_signed),
],
)
assert result.exit_code == 0
assert image_signed.exists()
assert image_signed.stat().st_size > 0
# original key can be used to verify a signed image
result = runner.invoke(
imgtool,
[
"verify",
"--key",
str(gen_key),
str(image_signed),
],
)
assert result.exit_code == 0
# 'another' key is not valid to verify a signed image
result = runner.invoke(
imgtool,
[
"verify",
"--key",
str(wrong_key),
str(image_signed),
],
)
assert result.exit_code != 0
image_signed.unlink()