summaryrefslogtreecommitdiffstats
path: root/tools/crypto/ccp
diff options
context:
space:
mode:
Diffstat (limited to 'tools/crypto/ccp')
-rw-r--r--tools/crypto/ccp/.gitignore1
-rw-r--r--tools/crypto/ccp/Makefile13
-rw-r--r--tools/crypto/ccp/dbc.c73
-rw-r--r--tools/crypto/ccp/dbc.py64
-rwxr-xr-xtools/crypto/ccp/dbc_cli.py134
-rwxr-xr-xtools/crypto/ccp/test_dbc.py275
6 files changed, 560 insertions, 0 deletions
diff --git a/tools/crypto/ccp/.gitignore b/tools/crypto/ccp/.gitignore
new file mode 100644
index 0000000000..bee8a64b79
--- /dev/null
+++ b/tools/crypto/ccp/.gitignore
@@ -0,0 +1 @@
+__pycache__
diff --git a/tools/crypto/ccp/Makefile b/tools/crypto/ccp/Makefile
new file mode 100644
index 0000000000..ae4a66d155
--- /dev/null
+++ b/tools/crypto/ccp/Makefile
@@ -0,0 +1,13 @@
+# SPDX-License-Identifier: GPL-2.0-only
+CFLAGS += -D__EXPORTED_HEADERS__ -I../../../include/uapi -I../../../include
+
+TARGET = dbc_library.so
+
+all: $(TARGET)
+
+dbc_library.so: dbc.c
+ $(CC) $(CFLAGS) $(LDFLAGS) -shared -o $@ $<
+ chmod -x $@
+
+clean:
+ $(RM) $(TARGET)
diff --git a/tools/crypto/ccp/dbc.c b/tools/crypto/ccp/dbc.c
new file mode 100644
index 0000000000..a807df0f05
--- /dev/null
+++ b/tools/crypto/ccp/dbc.c
@@ -0,0 +1,73 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * AMD Secure Processor Dynamic Boost Control sample library
+ *
+ * Copyright (C) 2023 Advanced Micro Devices, Inc.
+ *
+ * Author: Mario Limonciello <mario.limonciello@amd.com>
+ */
+
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/ioctl.h>
+
+/* if uapi header isn't installed, this might not yet exist */
+#ifndef __packed
+#define __packed __attribute__((packed))
+#endif
+#include <linux/psp-dbc.h>
+
+int get_nonce(int fd, void *nonce_out, void *signature)
+{
+ struct dbc_user_nonce tmp = {
+ .auth_needed = !!signature,
+ };
+
+ assert(nonce_out);
+
+ if (signature)
+ memcpy(tmp.signature, signature, sizeof(tmp.signature));
+
+ if (ioctl(fd, DBCIOCNONCE, &tmp))
+ return errno;
+ memcpy(nonce_out, tmp.nonce, sizeof(tmp.nonce));
+
+ return 0;
+}
+
+int set_uid(int fd, __u8 *uid, __u8 *signature)
+{
+ struct dbc_user_setuid tmp;
+
+ assert(uid);
+ assert(signature);
+
+ memcpy(tmp.uid, uid, sizeof(tmp.uid));
+ memcpy(tmp.signature, signature, sizeof(tmp.signature));
+
+ if (ioctl(fd, DBCIOCUID, &tmp))
+ return errno;
+ return 0;
+}
+
+int process_param(int fd, int msg_index, __u8 *signature, int *data)
+{
+ struct dbc_user_param tmp = {
+ .msg_index = msg_index,
+ .param = *data,
+ };
+ int ret;
+
+ assert(signature);
+ assert(data);
+
+ memcpy(tmp.signature, signature, sizeof(tmp.signature));
+
+ if (ioctl(fd, DBCIOCPARAM, &tmp))
+ return errno;
+
+ *data = tmp.param;
+ memcpy(signature, tmp.signature, sizeof(tmp.signature));
+ return 0;
+}
diff --git a/tools/crypto/ccp/dbc.py b/tools/crypto/ccp/dbc.py
new file mode 100644
index 0000000000..2b91415b19
--- /dev/null
+++ b/tools/crypto/ccp/dbc.py
@@ -0,0 +1,64 @@
+#!/usr/bin/python3
+# SPDX-License-Identifier: GPL-2.0
+
+import ctypes
+import os
+
+DBC_UID_SIZE = 16
+DBC_NONCE_SIZE = 16
+DBC_SIG_SIZE = 32
+
+PARAM_GET_FMAX_CAP = (0x3,)
+PARAM_SET_FMAX_CAP = (0x4,)
+PARAM_GET_PWR_CAP = (0x5,)
+PARAM_SET_PWR_CAP = (0x6,)
+PARAM_GET_GFX_MODE = (0x7,)
+PARAM_SET_GFX_MODE = (0x8,)
+PARAM_GET_CURR_TEMP = (0x9,)
+PARAM_GET_FMAX_MAX = (0xA,)
+PARAM_GET_FMAX_MIN = (0xB,)
+PARAM_GET_SOC_PWR_MAX = (0xC,)
+PARAM_GET_SOC_PWR_MIN = (0xD,)
+PARAM_GET_SOC_PWR_CUR = (0xE,)
+
+DEVICE_NODE = "/dev/dbc"
+
+lib = ctypes.CDLL("./dbc_library.so", mode=ctypes.RTLD_GLOBAL)
+
+
+def handle_error(code):
+ raise OSError(code, os.strerror(code))
+
+
+def get_nonce(device, signature):
+ if not device:
+ raise ValueError("Device required")
+ buf = ctypes.create_string_buffer(DBC_NONCE_SIZE)
+ ret = lib.get_nonce(device.fileno(), ctypes.byref(buf), signature)
+ if ret:
+ handle_error(ret)
+ return buf.value
+
+
+def set_uid(device, new_uid, signature):
+ if not signature:
+ raise ValueError("Signature required")
+ if not new_uid:
+ raise ValueError("UID required")
+ ret = lib.set_uid(device.fileno(), new_uid, signature)
+ if ret:
+ handle_error(ret)
+ return True
+
+
+def process_param(device, message, signature, data=None):
+ if not signature:
+ raise ValueError("Signature required")
+ if type(message) != tuple:
+ raise ValueError("Expected message tuple")
+ arg = ctypes.c_int(data if data else 0)
+ sig = ctypes.create_string_buffer(signature, len(signature))
+ ret = lib.process_param(device.fileno(), message[0], ctypes.pointer(sig), ctypes.pointer(arg))
+ if ret:
+ handle_error(ret)
+ return arg.value, sig.value
diff --git a/tools/crypto/ccp/dbc_cli.py b/tools/crypto/ccp/dbc_cli.py
new file mode 100755
index 0000000000..bf52233fd0
--- /dev/null
+++ b/tools/crypto/ccp/dbc_cli.py
@@ -0,0 +1,134 @@
+#!/usr/bin/python3
+# SPDX-License-Identifier: GPL-2.0
+import argparse
+import binascii
+import os
+import errno
+from dbc import *
+
+ERRORS = {
+ errno.EACCES: "Access is denied",
+ errno.E2BIG: "Excess data provided",
+ errno.EINVAL: "Bad parameters",
+ errno.EAGAIN: "Bad state",
+ errno.ENOENT: "Not implemented or message failure",
+ errno.EBUSY: "Busy",
+ errno.ENFILE: "Overflow",
+ errno.EPERM: "Signature invalid",
+}
+
+messages = {
+ "get-fmax-cap": PARAM_GET_FMAX_CAP,
+ "set-fmax-cap": PARAM_SET_FMAX_CAP,
+ "get-power-cap": PARAM_GET_PWR_CAP,
+ "set-power-cap": PARAM_SET_PWR_CAP,
+ "get-graphics-mode": PARAM_GET_GFX_MODE,
+ "set-graphics-mode": PARAM_SET_GFX_MODE,
+ "get-current-temp": PARAM_GET_CURR_TEMP,
+ "get-fmax-max": PARAM_GET_FMAX_MAX,
+ "get-fmax-min": PARAM_GET_FMAX_MIN,
+ "get-soc-power-max": PARAM_GET_SOC_PWR_MAX,
+ "get-soc-power-min": PARAM_GET_SOC_PWR_MIN,
+ "get-soc-power-cur": PARAM_GET_SOC_PWR_CUR,
+}
+
+
+def _pretty_buffer(ba):
+ return str(binascii.hexlify(ba, " "))
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(
+ description="Dynamic Boost control command line interface"
+ )
+ parser.add_argument(
+ "command",
+ choices=["get-nonce", "get-param", "set-param", "set-uid"],
+ help="Command to send",
+ )
+ parser.add_argument("--device", default="/dev/dbc", help="Device to operate")
+ parser.add_argument("--signature", help="File containing signature for command")
+ parser.add_argument("--message", choices=messages.keys(), help="Message index")
+ parser.add_argument("--data", help="Argument to pass to message")
+ parser.add_argument("--uid", help="File containing UID to pass")
+ return parser.parse_args()
+
+
+def pretty_error(code):
+ if code in ERRORS:
+ print(ERRORS[code])
+ else:
+ print("failed with return code %d" % code)
+
+
+if __name__ == "__main__":
+ args = parse_args()
+ data = 0
+ sig = None
+ uid = None
+ if not os.path.exists(args.device):
+ raise IOError("Missing device {device}".format(device=args.device))
+ if args.signature:
+ if not os.path.exists(args.signature):
+ raise ValueError("Invalid signature file %s" % args.signature)
+ with open(args.signature, "rb") as f:
+ sig = f.read()
+ if len(sig) != DBC_SIG_SIZE:
+ raise ValueError(
+ "Invalid signature length %d (expected %d)" % (len(sig), DBC_SIG_SIZE)
+ )
+ if args.uid:
+ if not os.path.exists(args.uid):
+ raise ValueError("Invalid uid file %s" % args.uid)
+ with open(args.uid, "rb") as f:
+ uid = f.read()
+ if len(uid) != DBC_UID_SIZE:
+ raise ValueError(
+ "Invalid UID length %d (expected %d)" % (len(uid), DBC_UID_SIZE)
+ )
+ if args.data:
+ try:
+ data = int(args.data, 10)
+ except ValueError:
+ data = int(args.data, 16)
+
+ with open(args.device) as d:
+ if args.command == "get-nonce":
+ try:
+ nonce = get_nonce(d, sig)
+ print("Nonce: %s" % _pretty_buffer(bytes(nonce)))
+ except OSError as e:
+ pretty_error(e.errno)
+ elif args.command == "set-uid":
+ try:
+ result = set_uid(d, uid, sig)
+ if result:
+ print("Set UID")
+ except OSError as e:
+ pretty_error(e.errno)
+ elif args.command == "get-param":
+ if not args.message or args.message.startswith("set"):
+ raise ValueError("Invalid message %s" % args.message)
+ try:
+ param, signature = process_param(d, messages[args.message], sig)
+ print(
+ "Parameter: {par}, response signature {sig}".format(
+ par=param,
+ sig=_pretty_buffer(bytes(signature)),
+ )
+ )
+ except OSError as e:
+ pretty_error(e.errno)
+ elif args.command == "set-param":
+ if not args.message or args.message.startswith("get"):
+ raise ValueError("Invalid message %s" % args.message)
+ try:
+ param, signature = process_param(d, messages[args.message], sig, data)
+ print(
+ "Parameter: {par}, response signature {sig}".format(
+ par=param,
+ sig=_pretty_buffer(bytes(signature)),
+ )
+ )
+ except OSError as e:
+ pretty_error(e.errno)
diff --git a/tools/crypto/ccp/test_dbc.py b/tools/crypto/ccp/test_dbc.py
new file mode 100755
index 0000000000..79de3638a0
--- /dev/null
+++ b/tools/crypto/ccp/test_dbc.py
@@ -0,0 +1,275 @@
+#!/usr/bin/python3
+# SPDX-License-Identifier: GPL-2.0
+import unittest
+import os
+import time
+import glob
+import fcntl
+try:
+ import ioctl_opt as ioctl
+except ImportError:
+ ioctl = None
+ pass
+from dbc import *
+
+# Artificial delay between set commands
+SET_DELAY = 0.5
+
+
+class invalid_param(ctypes.Structure):
+ _fields_ = [
+ ("data", ctypes.c_uint8),
+ ]
+
+
+def system_is_secured() -> bool:
+ fused_part = glob.glob("/sys/bus/pci/drivers/ccp/**/fused_part")[0]
+ if os.path.exists(fused_part):
+ with open(fused_part, "r") as r:
+ return int(r.read()) == 1
+ return True
+
+
+class DynamicBoostControlTest(unittest.TestCase):
+ def __init__(self, data) -> None:
+ self.d = None
+ self.signature = b"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
+ self.uid = b"1111111111111111"
+ super().__init__(data)
+
+ def setUp(self) -> None:
+ self.d = open(DEVICE_NODE)
+ return super().setUp()
+
+ def tearDown(self) -> None:
+ if self.d:
+ self.d.close()
+ return super().tearDown()
+
+
+class TestUnsupportedSystem(DynamicBoostControlTest):
+ def setUp(self) -> None:
+ if os.path.exists(DEVICE_NODE):
+ self.skipTest("system is supported")
+ with self.assertRaises(FileNotFoundError) as error:
+ super().setUp()
+ self.assertEqual(error.exception.errno, 2)
+
+ def test_unauthenticated_nonce(self) -> None:
+ """fetch unauthenticated nonce"""
+ with self.assertRaises(ValueError) as error:
+ get_nonce(self.d, None)
+
+
+class TestInvalidIoctls(DynamicBoostControlTest):
+ def __init__(self, data) -> None:
+ self.data = invalid_param()
+ self.data.data = 1
+ super().__init__(data)
+
+ def setUp(self) -> None:
+ if not os.path.exists(DEVICE_NODE):
+ self.skipTest("system is unsupported")
+ if not ioctl:
+ self.skipTest("unable to test IOCTLs without ioctl_opt")
+
+ return super().setUp()
+
+ def test_invalid_nonce_ioctl(self) -> None:
+ """tries to call get_nonce ioctl with invalid data structures"""
+
+ # 0x1 (get nonce), and invalid data
+ INVALID1 = ioctl.IOWR(ord("D"), 0x01, invalid_param)
+ with self.assertRaises(OSError) as error:
+ fcntl.ioctl(self.d, INVALID1, self.data, True)
+ self.assertEqual(error.exception.errno, 22)
+
+ def test_invalid_setuid_ioctl(self) -> None:
+ """tries to call set_uid ioctl with invalid data structures"""
+
+ # 0x2 (set uid), and invalid data
+ INVALID2 = ioctl.IOW(ord("D"), 0x02, invalid_param)
+ with self.assertRaises(OSError) as error:
+ fcntl.ioctl(self.d, INVALID2, self.data, True)
+ self.assertEqual(error.exception.errno, 22)
+
+ def test_invalid_setuid_rw_ioctl(self) -> None:
+ """tries to call set_uid ioctl with invalid data structures"""
+
+ # 0x2 as RW (set uid), and invalid data
+ INVALID3 = ioctl.IOWR(ord("D"), 0x02, invalid_param)
+ with self.assertRaises(OSError) as error:
+ fcntl.ioctl(self.d, INVALID3, self.data, True)
+ self.assertEqual(error.exception.errno, 22)
+
+ def test_invalid_param_ioctl(self) -> None:
+ """tries to call param ioctl with invalid data structures"""
+ # 0x3 (param), and invalid data
+ INVALID4 = ioctl.IOWR(ord("D"), 0x03, invalid_param)
+ with self.assertRaises(OSError) as error:
+ fcntl.ioctl(self.d, INVALID4, self.data, True)
+ self.assertEqual(error.exception.errno, 22)
+
+ def test_invalid_call_ioctl(self) -> None:
+ """tries to call the DBC ioctl with invalid data structures"""
+ # 0x4, and invalid data
+ INVALID5 = ioctl.IOWR(ord("D"), 0x04, invalid_param)
+ with self.assertRaises(OSError) as error:
+ fcntl.ioctl(self.d, INVALID5, self.data, True)
+ self.assertEqual(error.exception.errno, 22)
+
+
+class TestInvalidSignature(DynamicBoostControlTest):
+ def setUp(self) -> None:
+ if not os.path.exists(DEVICE_NODE):
+ self.skipTest("system is unsupported")
+ if not system_is_secured():
+ self.skipTest("system is unfused")
+ return super().setUp()
+
+ def test_unauthenticated_nonce(self) -> None:
+ """fetch unauthenticated nonce"""
+ get_nonce(self.d, None)
+
+ def test_multiple_unauthenticated_nonce(self) -> None:
+ """ensure state machine always returns nonce"""
+ for count in range(0, 2):
+ get_nonce(self.d, None)
+
+ def test_authenticated_nonce(self) -> None:
+ """fetch authenticated nonce"""
+ with self.assertRaises(OSError) as error:
+ get_nonce(self.d, self.signature)
+ self.assertEqual(error.exception.errno, 1)
+
+ def test_set_uid(self) -> None:
+ """set uid"""
+ with self.assertRaises(OSError) as error:
+ set_uid(self.d, self.uid, self.signature)
+ self.assertEqual(error.exception.errno, 1)
+
+ def test_get_param(self) -> None:
+ """fetch a parameter"""
+ with self.assertRaises(OSError) as error:
+ process_param(self.d, PARAM_GET_SOC_PWR_CUR, self.signature)
+ self.assertEqual(error.exception.errno, 1)
+
+ def test_set_param(self) -> None:
+ """set a parameter"""
+ with self.assertRaises(OSError) as error:
+ process_param(self.d, PARAM_SET_PWR_CAP, self.signature, 1000)
+ self.assertEqual(error.exception.errno, 1)
+
+
+class TestUnFusedSystem(DynamicBoostControlTest):
+ def setup_identity(self) -> None:
+ """sets up the identity of the caller"""
+ # if already authenticated these may fail
+ try:
+ get_nonce(self.d, None)
+ except PermissionError:
+ pass
+ try:
+ set_uid(self.d, self.uid, self.signature)
+ except BlockingIOError:
+ pass
+ try:
+ get_nonce(self.d, self.signature)
+ except PermissionError:
+ pass
+
+ def setUp(self) -> None:
+ if not os.path.exists(DEVICE_NODE):
+ self.skipTest("system is unsupported")
+ if system_is_secured():
+ self.skipTest("system is fused")
+ super().setUp()
+ self.setup_identity()
+ time.sleep(SET_DELAY)
+
+ def test_get_valid_param(self) -> None:
+ """fetch all possible parameters"""
+ # SOC power
+ soc_power_max = process_param(self.d, PARAM_GET_SOC_PWR_MAX, self.signature)
+ soc_power_min = process_param(self.d, PARAM_GET_SOC_PWR_MIN, self.signature)
+ self.assertGreater(soc_power_max[0], soc_power_min[0])
+
+ # fmax
+ fmax_max = process_param(self.d, PARAM_GET_FMAX_MAX, self.signature)
+ fmax_min = process_param(self.d, PARAM_GET_FMAX_MIN, self.signature)
+ self.assertGreater(fmax_max[0], fmax_min[0])
+
+ # cap values
+ keys = {
+ "fmax-cap": PARAM_GET_FMAX_CAP,
+ "power-cap": PARAM_GET_PWR_CAP,
+ "current-temp": PARAM_GET_CURR_TEMP,
+ "soc-power-cur": PARAM_GET_SOC_PWR_CUR,
+ }
+ for k in keys:
+ result = process_param(self.d, keys[k], self.signature)
+ self.assertGreater(result[0], 0)
+
+ def test_get_invalid_param(self) -> None:
+ """fetch an invalid parameter"""
+ try:
+ set_uid(self.d, self.uid, self.signature)
+ except OSError:
+ pass
+ with self.assertRaises(OSError) as error:
+ process_param(self.d, (0xF,), self.signature)
+ self.assertEqual(error.exception.errno, 22)
+
+ def test_set_fmax(self) -> None:
+ """get/set fmax limit"""
+ # fetch current
+ original = process_param(self.d, PARAM_GET_FMAX_CAP, self.signature)
+
+ # set the fmax
+ target = original[0] - 100
+ process_param(self.d, PARAM_SET_FMAX_CAP, self.signature, target)
+ time.sleep(SET_DELAY)
+ new = process_param(self.d, PARAM_GET_FMAX_CAP, self.signature)
+ self.assertEqual(new[0], target)
+
+ # revert back to current
+ process_param(self.d, PARAM_SET_FMAX_CAP, self.signature, original[0])
+ time.sleep(SET_DELAY)
+ cur = process_param(self.d, PARAM_GET_FMAX_CAP, self.signature)
+ self.assertEqual(cur[0], original[0])
+
+ def test_set_power_cap(self) -> None:
+ """get/set power cap limit"""
+ # fetch current
+ original = process_param(self.d, PARAM_GET_PWR_CAP, self.signature)
+
+ # set the fmax
+ target = original[0] - 10
+ process_param(self.d, PARAM_SET_PWR_CAP, self.signature, target)
+ time.sleep(SET_DELAY)
+ new = process_param(self.d, PARAM_GET_PWR_CAP, self.signature)
+ self.assertEqual(new[0], target)
+
+ # revert back to current
+ process_param(self.d, PARAM_SET_PWR_CAP, self.signature, original[0])
+ time.sleep(SET_DELAY)
+ cur = process_param(self.d, PARAM_GET_PWR_CAP, self.signature)
+ self.assertEqual(cur[0], original[0])
+
+ def test_set_3d_graphics_mode(self) -> None:
+ """set/get 3d graphics mode"""
+ # these aren't currently implemented but may be some day
+ # they are *expected* to fail
+ with self.assertRaises(OSError) as error:
+ process_param(self.d, PARAM_GET_GFX_MODE, self.signature)
+ self.assertEqual(error.exception.errno, 2)
+
+ time.sleep(SET_DELAY)
+
+ with self.assertRaises(OSError) as error:
+ process_param(self.d, PARAM_SET_GFX_MODE, self.signature, 1)
+ self.assertEqual(error.exception.errno, 2)
+
+
+if __name__ == "__main__":
+ unittest.main()