diff options
Diffstat (limited to 'tools/crypto/ccp')
-rw-r--r-- | tools/crypto/ccp/.gitignore | 1 | ||||
-rw-r--r-- | tools/crypto/ccp/Makefile | 13 | ||||
-rw-r--r-- | tools/crypto/ccp/dbc.c | 73 | ||||
-rw-r--r-- | tools/crypto/ccp/dbc.py | 64 | ||||
-rwxr-xr-x | tools/crypto/ccp/dbc_cli.py | 134 | ||||
-rwxr-xr-x | tools/crypto/ccp/test_dbc.py | 275 |
6 files changed, 560 insertions, 0 deletions
diff --git a/tools/crypto/ccp/.gitignore b/tools/crypto/ccp/.gitignore new file mode 100644 index 0000000000..bee8a64b79 --- /dev/null +++ b/tools/crypto/ccp/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/tools/crypto/ccp/Makefile b/tools/crypto/ccp/Makefile new file mode 100644 index 0000000000..ae4a66d155 --- /dev/null +++ b/tools/crypto/ccp/Makefile @@ -0,0 +1,13 @@ +# SPDX-License-Identifier: GPL-2.0-only +CFLAGS += -D__EXPORTED_HEADERS__ -I../../../include/uapi -I../../../include + +TARGET = dbc_library.so + +all: $(TARGET) + +dbc_library.so: dbc.c + $(CC) $(CFLAGS) $(LDFLAGS) -shared -o $@ $< + chmod -x $@ + +clean: + $(RM) $(TARGET) diff --git a/tools/crypto/ccp/dbc.c b/tools/crypto/ccp/dbc.c new file mode 100644 index 0000000000..a807df0f05 --- /dev/null +++ b/tools/crypto/ccp/dbc.c @@ -0,0 +1,73 @@ +// SPDX-License-Identifier: GPL-2.0-only +/* + * AMD Secure Processor Dynamic Boost Control sample library + * + * Copyright (C) 2023 Advanced Micro Devices, Inc. + * + * Author: Mario Limonciello <mario.limonciello@amd.com> + */ + +#include <assert.h> +#include <errno.h> +#include <string.h> +#include <sys/ioctl.h> + +/* if uapi header isn't installed, this might not yet exist */ +#ifndef __packed +#define __packed __attribute__((packed)) +#endif +#include <linux/psp-dbc.h> + +int get_nonce(int fd, void *nonce_out, void *signature) +{ + struct dbc_user_nonce tmp = { + .auth_needed = !!signature, + }; + + assert(nonce_out); + + if (signature) + memcpy(tmp.signature, signature, sizeof(tmp.signature)); + + if (ioctl(fd, DBCIOCNONCE, &tmp)) + return errno; + memcpy(nonce_out, tmp.nonce, sizeof(tmp.nonce)); + + return 0; +} + +int set_uid(int fd, __u8 *uid, __u8 *signature) +{ + struct dbc_user_setuid tmp; + + assert(uid); + assert(signature); + + memcpy(tmp.uid, uid, sizeof(tmp.uid)); + memcpy(tmp.signature, signature, sizeof(tmp.signature)); + + if (ioctl(fd, DBCIOCUID, &tmp)) + return errno; + return 0; +} + +int process_param(int fd, int msg_index, __u8 *signature, int *data) +{ + struct dbc_user_param tmp = { + .msg_index = msg_index, + .param = *data, + }; + int ret; + + assert(signature); + assert(data); + + memcpy(tmp.signature, signature, sizeof(tmp.signature)); + + if (ioctl(fd, DBCIOCPARAM, &tmp)) + return errno; + + *data = tmp.param; + memcpy(signature, tmp.signature, sizeof(tmp.signature)); + return 0; +} diff --git a/tools/crypto/ccp/dbc.py b/tools/crypto/ccp/dbc.py new file mode 100644 index 0000000000..2b91415b19 --- /dev/null +++ b/tools/crypto/ccp/dbc.py @@ -0,0 +1,64 @@ +#!/usr/bin/python3 +# SPDX-License-Identifier: GPL-2.0 + +import ctypes +import os + +DBC_UID_SIZE = 16 +DBC_NONCE_SIZE = 16 +DBC_SIG_SIZE = 32 + +PARAM_GET_FMAX_CAP = (0x3,) +PARAM_SET_FMAX_CAP = (0x4,) +PARAM_GET_PWR_CAP = (0x5,) +PARAM_SET_PWR_CAP = (0x6,) +PARAM_GET_GFX_MODE = (0x7,) +PARAM_SET_GFX_MODE = (0x8,) +PARAM_GET_CURR_TEMP = (0x9,) +PARAM_GET_FMAX_MAX = (0xA,) +PARAM_GET_FMAX_MIN = (0xB,) +PARAM_GET_SOC_PWR_MAX = (0xC,) +PARAM_GET_SOC_PWR_MIN = (0xD,) +PARAM_GET_SOC_PWR_CUR = (0xE,) + +DEVICE_NODE = "/dev/dbc" + +lib = ctypes.CDLL("./dbc_library.so", mode=ctypes.RTLD_GLOBAL) + + +def handle_error(code): + raise OSError(code, os.strerror(code)) + + +def get_nonce(device, signature): + if not device: + raise ValueError("Device required") + buf = ctypes.create_string_buffer(DBC_NONCE_SIZE) + ret = lib.get_nonce(device.fileno(), ctypes.byref(buf), signature) + if ret: + handle_error(ret) + return buf.value + + +def set_uid(device, new_uid, signature): + if not signature: + raise ValueError("Signature required") + if not new_uid: + raise ValueError("UID required") + ret = lib.set_uid(device.fileno(), new_uid, signature) + if ret: + handle_error(ret) + return True + + +def process_param(device, message, signature, data=None): + if not signature: + raise ValueError("Signature required") + if type(message) != tuple: + raise ValueError("Expected message tuple") + arg = ctypes.c_int(data if data else 0) + sig = ctypes.create_string_buffer(signature, len(signature)) + ret = lib.process_param(device.fileno(), message[0], ctypes.pointer(sig), ctypes.pointer(arg)) + if ret: + handle_error(ret) + return arg.value, sig.value diff --git a/tools/crypto/ccp/dbc_cli.py b/tools/crypto/ccp/dbc_cli.py new file mode 100755 index 0000000000..bf52233fd0 --- /dev/null +++ b/tools/crypto/ccp/dbc_cli.py @@ -0,0 +1,134 @@ +#!/usr/bin/python3 +# SPDX-License-Identifier: GPL-2.0 +import argparse +import binascii +import os +import errno +from dbc import * + +ERRORS = { + errno.EACCES: "Access is denied", + errno.E2BIG: "Excess data provided", + errno.EINVAL: "Bad parameters", + errno.EAGAIN: "Bad state", + errno.ENOENT: "Not implemented or message failure", + errno.EBUSY: "Busy", + errno.ENFILE: "Overflow", + errno.EPERM: "Signature invalid", +} + +messages = { + "get-fmax-cap": PARAM_GET_FMAX_CAP, + "set-fmax-cap": PARAM_SET_FMAX_CAP, + "get-power-cap": PARAM_GET_PWR_CAP, + "set-power-cap": PARAM_SET_PWR_CAP, + "get-graphics-mode": PARAM_GET_GFX_MODE, + "set-graphics-mode": PARAM_SET_GFX_MODE, + "get-current-temp": PARAM_GET_CURR_TEMP, + "get-fmax-max": PARAM_GET_FMAX_MAX, + "get-fmax-min": PARAM_GET_FMAX_MIN, + "get-soc-power-max": PARAM_GET_SOC_PWR_MAX, + "get-soc-power-min": PARAM_GET_SOC_PWR_MIN, + "get-soc-power-cur": PARAM_GET_SOC_PWR_CUR, +} + + +def _pretty_buffer(ba): + return str(binascii.hexlify(ba, " ")) + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Dynamic Boost control command line interface" + ) + parser.add_argument( + "command", + choices=["get-nonce", "get-param", "set-param", "set-uid"], + help="Command to send", + ) + parser.add_argument("--device", default="/dev/dbc", help="Device to operate") + parser.add_argument("--signature", help="File containing signature for command") + parser.add_argument("--message", choices=messages.keys(), help="Message index") + parser.add_argument("--data", help="Argument to pass to message") + parser.add_argument("--uid", help="File containing UID to pass") + return parser.parse_args() + + +def pretty_error(code): + if code in ERRORS: + print(ERRORS[code]) + else: + print("failed with return code %d" % code) + + +if __name__ == "__main__": + args = parse_args() + data = 0 + sig = None + uid = None + if not os.path.exists(args.device): + raise IOError("Missing device {device}".format(device=args.device)) + if args.signature: + if not os.path.exists(args.signature): + raise ValueError("Invalid signature file %s" % args.signature) + with open(args.signature, "rb") as f: + sig = f.read() + if len(sig) != DBC_SIG_SIZE: + raise ValueError( + "Invalid signature length %d (expected %d)" % (len(sig), DBC_SIG_SIZE) + ) + if args.uid: + if not os.path.exists(args.uid): + raise ValueError("Invalid uid file %s" % args.uid) + with open(args.uid, "rb") as f: + uid = f.read() + if len(uid) != DBC_UID_SIZE: + raise ValueError( + "Invalid UID length %d (expected %d)" % (len(uid), DBC_UID_SIZE) + ) + if args.data: + try: + data = int(args.data, 10) + except ValueError: + data = int(args.data, 16) + + with open(args.device) as d: + if args.command == "get-nonce": + try: + nonce = get_nonce(d, sig) + print("Nonce: %s" % _pretty_buffer(bytes(nonce))) + except OSError as e: + pretty_error(e.errno) + elif args.command == "set-uid": + try: + result = set_uid(d, uid, sig) + if result: + print("Set UID") + except OSError as e: + pretty_error(e.errno) + elif args.command == "get-param": + if not args.message or args.message.startswith("set"): + raise ValueError("Invalid message %s" % args.message) + try: + param, signature = process_param(d, messages[args.message], sig) + print( + "Parameter: {par}, response signature {sig}".format( + par=param, + sig=_pretty_buffer(bytes(signature)), + ) + ) + except OSError as e: + pretty_error(e.errno) + elif args.command == "set-param": + if not args.message or args.message.startswith("get"): + raise ValueError("Invalid message %s" % args.message) + try: + param, signature = process_param(d, messages[args.message], sig, data) + print( + "Parameter: {par}, response signature {sig}".format( + par=param, + sig=_pretty_buffer(bytes(signature)), + ) + ) + except OSError as e: + pretty_error(e.errno) diff --git a/tools/crypto/ccp/test_dbc.py b/tools/crypto/ccp/test_dbc.py new file mode 100755 index 0000000000..79de3638a0 --- /dev/null +++ b/tools/crypto/ccp/test_dbc.py @@ -0,0 +1,275 @@ +#!/usr/bin/python3 +# SPDX-License-Identifier: GPL-2.0 +import unittest +import os +import time +import glob +import fcntl +try: + import ioctl_opt as ioctl +except ImportError: + ioctl = None + pass +from dbc import * + +# Artificial delay between set commands +SET_DELAY = 0.5 + + +class invalid_param(ctypes.Structure): + _fields_ = [ + ("data", ctypes.c_uint8), + ] + + +def system_is_secured() -> bool: + fused_part = glob.glob("/sys/bus/pci/drivers/ccp/**/fused_part")[0] + if os.path.exists(fused_part): + with open(fused_part, "r") as r: + return int(r.read()) == 1 + return True + + +class DynamicBoostControlTest(unittest.TestCase): + def __init__(self, data) -> None: + self.d = None + self.signature = b"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF" + self.uid = b"1111111111111111" + super().__init__(data) + + def setUp(self) -> None: + self.d = open(DEVICE_NODE) + return super().setUp() + + def tearDown(self) -> None: + if self.d: + self.d.close() + return super().tearDown() + + +class TestUnsupportedSystem(DynamicBoostControlTest): + def setUp(self) -> None: + if os.path.exists(DEVICE_NODE): + self.skipTest("system is supported") + with self.assertRaises(FileNotFoundError) as error: + super().setUp() + self.assertEqual(error.exception.errno, 2) + + def test_unauthenticated_nonce(self) -> None: + """fetch unauthenticated nonce""" + with self.assertRaises(ValueError) as error: + get_nonce(self.d, None) + + +class TestInvalidIoctls(DynamicBoostControlTest): + def __init__(self, data) -> None: + self.data = invalid_param() + self.data.data = 1 + super().__init__(data) + + def setUp(self) -> None: + if not os.path.exists(DEVICE_NODE): + self.skipTest("system is unsupported") + if not ioctl: + self.skipTest("unable to test IOCTLs without ioctl_opt") + + return super().setUp() + + def test_invalid_nonce_ioctl(self) -> None: + """tries to call get_nonce ioctl with invalid data structures""" + + # 0x1 (get nonce), and invalid data + INVALID1 = ioctl.IOWR(ord("D"), 0x01, invalid_param) + with self.assertRaises(OSError) as error: + fcntl.ioctl(self.d, INVALID1, self.data, True) + self.assertEqual(error.exception.errno, 22) + + def test_invalid_setuid_ioctl(self) -> None: + """tries to call set_uid ioctl with invalid data structures""" + + # 0x2 (set uid), and invalid data + INVALID2 = ioctl.IOW(ord("D"), 0x02, invalid_param) + with self.assertRaises(OSError) as error: + fcntl.ioctl(self.d, INVALID2, self.data, True) + self.assertEqual(error.exception.errno, 22) + + def test_invalid_setuid_rw_ioctl(self) -> None: + """tries to call set_uid ioctl with invalid data structures""" + + # 0x2 as RW (set uid), and invalid data + INVALID3 = ioctl.IOWR(ord("D"), 0x02, invalid_param) + with self.assertRaises(OSError) as error: + fcntl.ioctl(self.d, INVALID3, self.data, True) + self.assertEqual(error.exception.errno, 22) + + def test_invalid_param_ioctl(self) -> None: + """tries to call param ioctl with invalid data structures""" + # 0x3 (param), and invalid data + INVALID4 = ioctl.IOWR(ord("D"), 0x03, invalid_param) + with self.assertRaises(OSError) as error: + fcntl.ioctl(self.d, INVALID4, self.data, True) + self.assertEqual(error.exception.errno, 22) + + def test_invalid_call_ioctl(self) -> None: + """tries to call the DBC ioctl with invalid data structures""" + # 0x4, and invalid data + INVALID5 = ioctl.IOWR(ord("D"), 0x04, invalid_param) + with self.assertRaises(OSError) as error: + fcntl.ioctl(self.d, INVALID5, self.data, True) + self.assertEqual(error.exception.errno, 22) + + +class TestInvalidSignature(DynamicBoostControlTest): + def setUp(self) -> None: + if not os.path.exists(DEVICE_NODE): + self.skipTest("system is unsupported") + if not system_is_secured(): + self.skipTest("system is unfused") + return super().setUp() + + def test_unauthenticated_nonce(self) -> None: + """fetch unauthenticated nonce""" + get_nonce(self.d, None) + + def test_multiple_unauthenticated_nonce(self) -> None: + """ensure state machine always returns nonce""" + for count in range(0, 2): + get_nonce(self.d, None) + + def test_authenticated_nonce(self) -> None: + """fetch authenticated nonce""" + with self.assertRaises(OSError) as error: + get_nonce(self.d, self.signature) + self.assertEqual(error.exception.errno, 1) + + def test_set_uid(self) -> None: + """set uid""" + with self.assertRaises(OSError) as error: + set_uid(self.d, self.uid, self.signature) + self.assertEqual(error.exception.errno, 1) + + def test_get_param(self) -> None: + """fetch a parameter""" + with self.assertRaises(OSError) as error: + process_param(self.d, PARAM_GET_SOC_PWR_CUR, self.signature) + self.assertEqual(error.exception.errno, 1) + + def test_set_param(self) -> None: + """set a parameter""" + with self.assertRaises(OSError) as error: + process_param(self.d, PARAM_SET_PWR_CAP, self.signature, 1000) + self.assertEqual(error.exception.errno, 1) + + +class TestUnFusedSystem(DynamicBoostControlTest): + def setup_identity(self) -> None: + """sets up the identity of the caller""" + # if already authenticated these may fail + try: + get_nonce(self.d, None) + except PermissionError: + pass + try: + set_uid(self.d, self.uid, self.signature) + except BlockingIOError: + pass + try: + get_nonce(self.d, self.signature) + except PermissionError: + pass + + def setUp(self) -> None: + if not os.path.exists(DEVICE_NODE): + self.skipTest("system is unsupported") + if system_is_secured(): + self.skipTest("system is fused") + super().setUp() + self.setup_identity() + time.sleep(SET_DELAY) + + def test_get_valid_param(self) -> None: + """fetch all possible parameters""" + # SOC power + soc_power_max = process_param(self.d, PARAM_GET_SOC_PWR_MAX, self.signature) + soc_power_min = process_param(self.d, PARAM_GET_SOC_PWR_MIN, self.signature) + self.assertGreater(soc_power_max[0], soc_power_min[0]) + + # fmax + fmax_max = process_param(self.d, PARAM_GET_FMAX_MAX, self.signature) + fmax_min = process_param(self.d, PARAM_GET_FMAX_MIN, self.signature) + self.assertGreater(fmax_max[0], fmax_min[0]) + + # cap values + keys = { + "fmax-cap": PARAM_GET_FMAX_CAP, + "power-cap": PARAM_GET_PWR_CAP, + "current-temp": PARAM_GET_CURR_TEMP, + "soc-power-cur": PARAM_GET_SOC_PWR_CUR, + } + for k in keys: + result = process_param(self.d, keys[k], self.signature) + self.assertGreater(result[0], 0) + + def test_get_invalid_param(self) -> None: + """fetch an invalid parameter""" + try: + set_uid(self.d, self.uid, self.signature) + except OSError: + pass + with self.assertRaises(OSError) as error: + process_param(self.d, (0xF,), self.signature) + self.assertEqual(error.exception.errno, 22) + + def test_set_fmax(self) -> None: + """get/set fmax limit""" + # fetch current + original = process_param(self.d, PARAM_GET_FMAX_CAP, self.signature) + + # set the fmax + target = original[0] - 100 + process_param(self.d, PARAM_SET_FMAX_CAP, self.signature, target) + time.sleep(SET_DELAY) + new = process_param(self.d, PARAM_GET_FMAX_CAP, self.signature) + self.assertEqual(new[0], target) + + # revert back to current + process_param(self.d, PARAM_SET_FMAX_CAP, self.signature, original[0]) + time.sleep(SET_DELAY) + cur = process_param(self.d, PARAM_GET_FMAX_CAP, self.signature) + self.assertEqual(cur[0], original[0]) + + def test_set_power_cap(self) -> None: + """get/set power cap limit""" + # fetch current + original = process_param(self.d, PARAM_GET_PWR_CAP, self.signature) + + # set the fmax + target = original[0] - 10 + process_param(self.d, PARAM_SET_PWR_CAP, self.signature, target) + time.sleep(SET_DELAY) + new = process_param(self.d, PARAM_GET_PWR_CAP, self.signature) + self.assertEqual(new[0], target) + + # revert back to current + process_param(self.d, PARAM_SET_PWR_CAP, self.signature, original[0]) + time.sleep(SET_DELAY) + cur = process_param(self.d, PARAM_GET_PWR_CAP, self.signature) + self.assertEqual(cur[0], original[0]) + + def test_set_3d_graphics_mode(self) -> None: + """set/get 3d graphics mode""" + # these aren't currently implemented but may be some day + # they are *expected* to fail + with self.assertRaises(OSError) as error: + process_param(self.d, PARAM_GET_GFX_MODE, self.signature) + self.assertEqual(error.exception.errno, 2) + + time.sleep(SET_DELAY) + + with self.assertRaises(OSError) as error: + process_param(self.d, PARAM_SET_GFX_MODE, self.signature, 1) + self.assertEqual(error.exception.errno, 2) + + +if __name__ == "__main__": + unittest.main() |