diff options
| author | Douglas Bagnall <douglas.bagnall@catalyst.net.nz> | 2025-08-06 14:01:14 +1200 |
|---|---|---|
| committer | Douglas Bagnall <dbagnall@samba.org> | 2025-08-20 04:34:37 +0000 |
| commit | 2681fe5df87db07b080e12fa7aeaaea0a0518546 (patch) | |
| tree | fe596360e06601b8f113c5ebe61350b87df2607a /python | |
| parent | 625cabf65140ee2c79b0a89c483edd071d58a4f4 (diff) | |
| download | samba-2681fe5df87db07b080e12fa7aeaaea0a0518546.tar.gz samba-2681fe5df87db07b080e12fa7aeaaea0a0518546.tar.bz2 samba-2681fe5df87db07b080e12fa7aeaaea0a0518546.zip | |
samba-tool: add user keytrust command
This allows manipulation of key credential links for users.
See `man -l bin/default/docs-xml/manpages/samba-tool.8` for
documentation.
Signed-off-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Gary Lockyer <gary@catalyst.net.nz>
Diffstat (limited to 'python')
| -rw-r--r-- | python/samba/netcmd/user/__init__.py | 2 | ||||
| -rw-r--r-- | python/samba/netcmd/user/keytrust.py | 223 | ||||
| -rw-r--r-- | python/samba/tests/samba_tool/user_keytrust.py | 360 |
3 files changed, 585 insertions, 0 deletions
diff --git a/python/samba/netcmd/user/__init__.py b/python/samba/netcmd/user/__init__.py index fab657c2278..e73f2d323e0 100644 --- a/python/samba/netcmd/user/__init__.py +++ b/python/samba/netcmd/user/__init__.py @@ -27,6 +27,7 @@ from .disable import cmd_user_disable from .edit import cmd_user_edit from .enable import cmd_user_enable from .getgroups import cmd_user_getgroups +from .keytrust import cmd_user_keytrust from .list import cmd_user_list from .move import cmd_user_move from .password import cmd_user_password @@ -52,6 +53,7 @@ class cmd_user(SuperCommand): subcommands["delete"] = cmd_user_delete() subcommands["disable"] = cmd_user_disable() subcommands["enable"] = cmd_user_enable() + subcommands["keytrust"] = cmd_user_keytrust() subcommands["list"] = cmd_user_list() subcommands["setexpiry"] = cmd_user_setexpiry() subcommands["password"] = cmd_user_password() diff --git a/python/samba/netcmd/user/keytrust.py b/python/samba/netcmd/user/keytrust.py new file mode 100644 index 00000000000..c72e304d9cd --- /dev/null +++ b/python/samba/netcmd/user/keytrust.py @@ -0,0 +1,223 @@ +# samba-tool commands to manager Key Credential Links on a user +# +# Copyright © Douglas Bagnall <dbagnall@samba.org> 2025 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import ldb +import samba.getopt as options +from samba.domain.models import User +from samba.domain.models.exceptions import ModelError +from samba.netcmd import Command, CommandError, Option, SuperCommand +from samba.netcmd import exception_to_command_error +from samba.key_credential_link import (create_key_credential_link, + kcl_in_list, + filter_kcl_list) + + +class cmd_user_keycredentiallink_add(Command): + """Add a key-credential-link.""" + + synopsis = "%prog <username> [options] <pubkey>" + + takes_args = ["username", "pubkey"] + + takes_optiongroups = { + "sambaopts": options.SambaOptions, + "credopts": options.CredentialsOptions, + "hostopts": options.HostOptions, + } + + takes_options = [ + Option("--link-target", metavar="DN", + help="link to this DN (default: this user's DN)"), + Option("--encoding", default='auto', choices=('pem', 'der', 'auto'), + help="Key format (optional)"), + Option("--force", default=False, action='store_true', + help="proceed with operations that seems ill-fated"), + ] + + @exception_to_command_error(ValueError, ModelError, FileNotFoundError) + def run(self, username, pubkey, + hostopts=None, sambaopts=None, credopts=None, + link_target=None, encoding='auto', force=False): + + samdb = self.ldb_connect(hostopts, sambaopts, credopts) + user = User.find(samdb, username) + + if link_target is None: + link_target = user.dn + + with open(pubkey, 'rb') as f: + data = f.read() + + try: + link = create_key_credential_link(samdb, + link_target, + data, + encoding=encoding, + force=force) + except ldb.LdbError as e: + # with --force, we will end up with CONSTRAINT_VIOLATION + # at user.save(), rather than NO_SUCH_OBJECT now. + if e.args[0] == ldb.ERR_NO_SUCH_OBJECT: + raise CommandError(f"Link target '{link_target}' does not exist") + raise + + if not force and kcl_in_list(link, user.key_credential_link): + # It is not allowed to have duplicate linked attributes, + # which in the case of key credential links means having + # the same key blob and the same DN target. + # + # It is still possible to have the same key material and + # DN target if other fields (e.g. creation date) in the + # blob differ. The creation date is set with one second + # resolution in create_key_credential_link() just above, + # which puts us in the awkward position of creating a race + # if people are running samba-tool in a script. + # + # While the uniqueness invariant is a feature of AD/DSDB, + # not of key credential links, duplicates are not going to + # be useful, so we try to avoid this by checking first + # unless --force is used. + # + # if --force is used to add a key for the second time in + # the same second, user.save() below will raise an + # ERR_ATTRIBUTE_OR_VALUE_EXISTS LdbError. + raise CommandError(f"User {username} " + "already has this key credential link") + + user.key_credential_link.append(link) + user.save(samdb) + + +class cmd_user_keycredentiallink_delete(Command): + """Delete a key-credential-link.""" + + synopsis = "%prog <username> [options]" + + takes_args = ["username"] + + takes_options = [ + Option("--link-target", metavar="DN", + help="Delete this key credential link (a DN)"), + Option("--fingerprint", metavar="HH:HH:..", + help="Delete the key credential link with this key fingerprint"), + Option("--all", action='store_true', + help="Delete all key credential links"), + Option("-n", "--dry-run", action='store_true', + help="Do nothing but print what would happen"), + ] + + takes_optiongroups = { + "sambaopts": options.SambaOptions, + "credopts": options.CredentialsOptions, + "hostopts": options.HostOptions, + } + + @exception_to_command_error(ValueError, ModelError) + def run(self, username, hostopts=None, sambaopts=None, credopts=None, + link_target=None, fingerprint=None, all=False, dry_run=False): + + samdb = self.ldb_connect(hostopts, sambaopts, credopts) + user = User.find(samdb, username) + + keycredlinks = user.key_credential_link + + if all: + goners = keycredlinks + else: + goners = filter_kcl_list(samdb, + keycredlinks, + link_target=link_target, + fingerprint=fingerprint) + + keepers = [x for x in keycredlinks if x not in goners] + nk = len(keepers) + + if dry_run: + self.message("Without --dry-run, this would happen:") + if not goners: + self.message("NO key credential links are deleted") + for x in goners: + self.message(f"DELETE {x} (fingerprint {x.fingerprint()})") + self.message('') + for x in keepers: + self.message(f"KEEP {x} (fingerprint {x.fingerprint()})") + + self.message(f"{username} would now have {nk} key credential link" + f"{'' if nk == 1 else 's'}") + return + + if not goners: + # fail without traceback if the filter matches no links + raise CommandError("no key credential links deleted") + + user.key_credential_link = keepers + user.save(samdb) + + for x in goners: + self.message(f"Deleted {x} (fingerprint {x.fingerprint()})") + self.message('') + for x in keepers: + self.message(f"Keeping {x} (fingerprint {x.fingerprint()})") + + self.message(f"{username} now has {nk} key credential link" + f"{'' if nk == 1 else 's'}") + + +class cmd_user_keycredentiallink_view(Command): + """View a user's key credential links.""" + synopsis = "%prog <username> [options]" + + takes_args = ["username"] + + takes_options = [ + Option("-v", "--verbose", help="Be verbose", action="store_true"), + ] + + takes_optiongroups = { + "sambaopts": options.SambaOptions, + "credopts": options.CredentialsOptions, + "hostopts": options.HostOptions, + } + + @exception_to_command_error(ValueError, ModelError) + def run(self, username, hostopts=None, sambaopts=None, credopts=None, + verbose=False): + + samdb = self.ldb_connect(hostopts, sambaopts, credopts) + user = User.find(samdb, username) + + if verbose: + verbosity = 3 + else: + verbosity = 2 + + n = len(user.key_credential_link) + self.message(f"{username} has {n} key credential link" + f"{'' if n == 1 else 's'}\n") + + for kcl in user.key_credential_link: + self.message(kcl.description(verbosity), '') + + +class cmd_user_keytrust(SuperCommand): + """Manage key-credential links on a user.""" + + subcommands = { + "add": cmd_user_keycredentiallink_add(), + "delete": cmd_user_keycredentiallink_delete(), + "view": cmd_user_keycredentiallink_view(), + } diff --git a/python/samba/tests/samba_tool/user_keytrust.py b/python/samba/tests/samba_tool/user_keytrust.py new file mode 100644 index 00000000000..bcc587be32f --- /dev/null +++ b/python/samba/tests/samba_tool/user_keytrust.py @@ -0,0 +1,360 @@ +# Unix SMB/CIFS implementation. +# +# Tests for `samba-tool user keytrust` +# +# Copyright © Douglas Bagnall <dbagnall@samba.org> 2025 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +from pathlib import Path + +from samba.domain.models import User +from samba.tests.samba_tool.base import SambaToolCmdTest +from samba import key_credential_link as kcl + + +HOST = "ldap://{DC_SERVER}".format(**os.environ) +CREDS = "-U{DC_USERNAME}%{DC_PASSWORD}".format(**os.environ) + +ROOT = (Path(__file__) / '../../../../../').resolve() +TESTDATA = ROOT / 'testdata' / 'keytrust' + +GOOD_CERTS = [ + str(TESTDATA / 'cert-rsa-2048.pem'), + str(TESTDATA / 'ca-cert-rsa-2048.pem'), +] + +WRONG_SIZE_CERTS = [ + str(TESTDATA / 'cert-rsa-1024.pem'), + str(TESTDATA / 'ca-cert-rsa-4096.pem'), +] + +NON_RSA_CERTS = [ + str(TESTDATA / 'ca-cert-ecdsa-p256.pem'), +] + +GOOD_KEYS = [ + str(TESTDATA / 'rsa2048-pkcs1.der'), + str(TESTDATA / 'rsa2048b-spki.pem'), +] + +DUPLICATE_KEYS = [ + str(TESTDATA / 'cert-rsa-2048.pem'), + str(TESTDATA / 'public-key-from-cert-rsa-2048-pkcs1.pem'), +] + + +class SambaToolUserKeyTrustTest(SambaToolCmdTest): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.samdb = cls.getSamDB("-H", HOST, CREDS) + cls.runcmd("user", "key-trust", "delete", + "-H", HOST, CREDS, + 'joe', '--all') + cls.runcmd("user", "key-trust", "delete", + "-H", HOST, CREDS, + 'alice', '--all') + + def get_links(self, username): + result = self.samdb.search(expression=f'sAMAccountName={username}', + attrs=['msDS-KeyCredentialLink']) + self.assertEqual(len(result), 1) + links = result[0].get('msDS-KeyCredentialLink', []) + return [kcl.KeyCredentialLinkDn(self.samdb, v) for v in links] + + def test_add_good_cert(self): + """These ones should just succeed.""" + links = self.get_links('joe') + n = len(links) + for f in GOOD_CERTS: + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + 'joe', f) + self.assertCmdSuccess(result, out, err) + + n += 1 + links = self.get_links('joe') + self.assertEqual(len(links), n) + + result, out, err = self.runcmd("user", "key-trust", "delete", + "-H", HOST, CREDS, + 'joe', '--all') + self.assertCmdSuccess(result, out, err) + + for link in links: + self.assertIn(f"Deleted {link}", out) + + links = self.get_links('joe') + self.assertEqual(links, []) + + def test_add_and_delete_good_keys(self): + """Add known good keys, and also check the view and delete commands.""" + links = self.get_links('alice') + self.assertEqual(links, []) + + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + 'alice', GOOD_KEYS[0]) + self.assertCmdSuccess(result, out, err) + links = self.get_links('alice') + self.assertEqual(len(links), 1) + + result, out, err = self.runcmd("user", "key-trust", "view", + "-H", HOST, CREDS, + 'alice') + self.assertCmdSuccess(result, out, err) + self.assertIn('alice has 1 key credential link\n', out) + self.assertIn('Link target: CN=alice,CN=Users,DC=addom,DC=samba,DC=example,DC=com\n', out) + self.assertIn('Number of key entries: 5', out) + + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + 'alice', GOOD_KEYS[1]) + self.assertCmdSuccess(result, out, err) + result, out, err = self.runcmd("user", "key-trust", "view", + "-H", HOST, CREDS, + 'alice', '--verbose') + self.assertCmdSuccess(result, out, err) + self.assertIn('alice has 2 key credential links\n', out) + + links = self.get_links('alice') + fingerprints = [('16:CD:1B:C2:7A:0B:FC:C9:4B:95:11:9F:AD:97:EC:1B:' + 'ED:BD:64:91:42:2E:AF:CA:CB:1E:C3:EE:86:6D:F1:5A'), + ('86:61:6D:B2:6A:3A:04:BD:E0:59:10:13:21:9A:2B:2C:' + 'C4:FD:CE:50:05:16:3C:66:1B:38:63:79:8C:B1:DA:17')] + + self.assertEqual(set(x.fingerprint() for x in links), + set(fingerprints)) + + # test delete --dry-run / -n + result, out, err = self.runcmd("user", "key-trust", "delete", + "-H", HOST, CREDS, + 'alice', '--all', '-n') + self.assertCmdSuccess(result, out, err) + self.assertIn('Without --dry-run, this would happen:\n', out) + self.assertIn(f'DELETE {links[0]} (fingerprint {links[0].fingerprint()})', + out) + self.assertIn(f'DELETE {links[1]} (fingerprint {links[1].fingerprint()})', + out) + self.assertNotIn('KEEP', out) + self.assertIn('alice would now have 0 key credential links\n', out) + + result, out, err = self.runcmd("user", "key-trust", "delete", + "-H", HOST, CREDS, + 'alice', '--fingerprint=whatever', + '--dry-run') + self.assertCmdSuccess(result, out, err) + self.assertIn('NO key credential links are deleted\n', out) + + self.assertIn(f'KEEP {links[0]} (fingerprint {links[0].fingerprint()})', + out) + self.assertIn(f'KEEP {links[1]} (fingerprint {links[1].fingerprint()})', + out) + self.assertIn('alice would now have 2 key credential links\n', out) + + result, out, err = self.runcmd("user", "key-trust", "delete", + "-H", HOST, CREDS, + 'alice', + '--fingerprint', + fingerprints[1], + '--dry-run') + self.assertCmdSuccess(result, out, err) + self.assertIn(f'DELETE {links[1]} (fingerprint {links[1].fingerprint()})', + out) + self.assertIn(f'KEEP {links[0]} (fingerprint {links[0].fingerprint()})', + out) + self.assertIn('alice would now have 1 key credential link\n', out) + + # this time deleting for real + result, out, err = self.runcmd("user", "key-trust", "delete", + "-H", HOST, CREDS, + 'alice', '--all') + self.assertCmdSuccess(result, out, err) + links = self.get_links('alice') + self.assertEqual(links, []) + + result, out, err = self.runcmd("user", "key-trust", "view", + "-H", HOST, CREDS, + 'alice') + self.assertCmdSuccess(result, out, err) + self.assertIn('alice has 0 key credential links\n', out) + + def test_add_duplicate_keys(self): + """You should not be able to add the same link twice.""" + + self.addCleanup(self.runcmd, "user", "key-trust", "delete", + "-H", HOST, CREDS, + 'alice', '--all') + + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + 'alice', DUPLICATE_KEYS[0]) + self.assertCmdSuccess(result, out, err) + + # This source file is different, but contains the same public + # key. samba-tool should notice this and fail *before* it + # fails in the dsdb layer with ERR_ATTRIBUTE_OR_VALUE_EXISTS + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + 'alice', DUPLICATE_KEYS[1]) + self.assertCmdFail(result) + self.assertNotIn('ATTRIBUTE_OR_VALUE_EXISTS', err) + + # adding the first file again should also fail. + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + 'alice', DUPLICATE_KEYS[0]) + self.assertCmdFail(result) + + # adding to a different DN is OK + base_dn = self.samdb.domain_dn() + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + "--link-target", base_dn, + 'alice', DUPLICATE_KEYS[1]) + self.assertCmdSuccess(result, out, err) + + self.assertEqual(len(self.get_links('alice')), 2) + + def test_add_wrong_size_keys(self): + """You should not be able to add the same link twice.""" + + self.addCleanup(self.runcmd, "user", "key-trust", "delete", + "-H", HOST, CREDS, + 'joe', '--all') + + for fn in WRONG_SIZE_CERTS: + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + 'joe', fn) + self.assertCmdFail(result) + self.assertIn('ERROR: 2048 bit RSA key expected, not', err) + + self.assertEqual(self.get_links('joe'), []) + + for fn in WRONG_SIZE_CERTS: + # it will work with --force + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + '--force', + 'joe', fn) + + self.assertCmdSuccess(result, out, err) + + self.assertEqual(len(self.get_links('joe')), 2) + + def test_add_non_rsa_keys(self): + """You should not be able to add the same link twice.""" + + self.addCleanup(self.runcmd, "user", "key-trust", "delete", + "-H", HOST, CREDS, + 'joe', '--all') + + for fn in NON_RSA_CERTS: + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + 'joe', fn) + self.assertCmdFail(result) + self.assertIn('only RSA Public Keys are supported', err) + + self.assertEqual(self.get_links('joe'), []) + + for fn in NON_RSA_CERTS: + # it will NOT work with --force + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + '--force', + 'joe', fn) + + self.assertCmdFail(result) + self.assertIn('only RSA Public Keys are supported', err) + + self.assertEqual(self.get_links('joe'), []) + + def test_add_good_cert_bad_dn(self): + """Fails differently with --force""" + links = self.get_links('joe') + n = len(links) + target = f"CN=an RDN that is not there,{self.samdb.domain_dn()}" + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + '--link-target', target, + 'joe', GOOD_CERTS[0]) + self.assertCmdFail(result) + self.assertIn(f"ERROR: Link target '{target}' does not exist", err) + self.assertEqual(len(links), 0) + + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + '--link-target', target, + '--force', + 'joe', GOOD_CERTS[1]) + self.assertCmdFail(result) + self.assertIn("ERROR(ldb): uncaught exception", err) + self.assertIn("LDAP_CONSTRAINT_VIOLATION", err) + self.assertEqual(len(links), 0) + + def test_add_good_cert_bad_encoding(self): + """If we use --encoding=pem with a DER file or vice versa, it + should fail.""" + self.addCleanup(self.runcmd, "user", "key-trust", "delete", + "-H", HOST, CREDS, + 'joe', '--all') + + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + '--encoding', 'der', + 'joe', GOOD_CERTS[0]) + self.assertCmdFail(result) + self.assertIn("ERROR: could not decode public key", err) + self.assertEqual(self.get_links('joe'), []) + + # try to --force this one, to no avail + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + '--force', + '--encoding', 'pem', + 'joe', GOOD_KEYS[0]) + self.assertCmdFail(result) + self.assertIn("ERROR: could not decode public key", err) + self.assertEqual(self.get_links('joe'), []) + + with self.assertRaises(SystemExit): + # we can't catch result and output here because it fails + # in optparse which prints straight to stderr. + self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + '--encoding', 'pineapple', + 'joe', GOOD_CERTS[1]) + self.assertCmdFail(result) + self.assertEqual(self.get_links('joe'), []) + + # right encoding + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + '--encoding', 'pem', + 'joe', GOOD_CERTS[1]) + self.assertCmdSuccess(result, out, err) + self.assertEqual(len(self.get_links('joe')), 1) + + # 'auto' encoding + result, out, err = self.runcmd("user", "key-trust", "add", + "-H", HOST, CREDS, + '--encoding', 'auto', + 'joe', GOOD_CERTS[0]) + self.assertCmdSuccess(result, out, err) + self.assertEqual(len(self.get_links('joe')), 2) |
