# Unix SMB/CIFS implementation.
#
# samba-tool commands for Key Distribution Services
#
# Copyright © Catalyst.Net Ltd. 2024
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import samba.getopt as options
from ldb import SCOPE_SUBTREE
from samba.netcmd import Command, CommandError, Option, SuperCommand
from samba.dcerpc import misc
from ldb import MessageElement, LdbError
from samba import string_is_guid
from samba.nt_time import (string_from_nt_time,
nt_time_from_string,
nt_now,
timedelta_from_nt_time_delta)
def root_key_base_dn(ldb):
base_dn = ldb.get_config_basedn()
base_dn.add_child(
"CN=Master Root Keys,CN=Group Key Distribution Service,CN=Services")
return base_dn
def get_root_key_by_name_or_dn(ldb, name, attrs=None):
if string_is_guid(str(name)):
key = 'name'
else:
key = 'dn'
if attrs is None:
attrs = ['*']
base_dn = root_key_base_dn(ldb)
expression = ("(&(objectClass = msKds-ProvRootKey)"
f"({key} = {name}))")
res = ldb.search(base_dn,
scope=SCOPE_SUBTREE,
expression=expression,
attrs=attrs)
if len(res) == 0:
raise CommandError(f"no such root key: {name}")
if len(res) != 1:
# the database is in a sorry state
raise CommandError(f"duplicate root keys matching {name}")
return res[0]
def get_sorted_root_keys(ldb, attrs=None, n=None):
if attrs is None:
attrs = ['*']
base_dn = root_key_base_dn(ldb)
res = ldb.search(base_dn,
scope=SCOPE_SUBTREE,
expression="(objectClass = msKds-ProvRootKey)",
attrs=attrs,
controls=["server_sort:1:1:msKds-UseStartTime"])
return res
def delta_string(d):
"""Turn a datetime.timedelta into an approximate string."""
td = timedelta_from_nt_time_delta(d)
secs = td.total_seconds()
absolute = abs(secs)
if absolute < 2:
return 'about now'
s = 'about '
if absolute < 120:
s += f'{int(absolute)} seconds'
elif absolute < 7200:
s += f'{int(absolute / 60)} minutes'
elif absolute < 48 * 3600:
s += f'{int(absolute / 3600)} hours'
else:
s += f'{int(absolute / (24 * 3600))} days'
if secs <= 0:
s += ' ago'
else:
s += ' in the FUTURE'
return s
# These next ridiculously simple looking functions are for the
# ENCODERS mapping below.
def guid_to_string(v):
return str(misc.GUID(v))
def string_from_nt_time_string(nt_time):
nt_time = int(nt_time)
return string_from_nt_time(nt_time)
# ENCODERS is a mapping of attribute names to encoding functions for
# the corresponding values. Anything not mentioned will go through
# str(), which for MessageElements is the same as bytes.decode().
ENCODERS = {
"msKds-UseStartTime": string_from_nt_time_string,
"msKds-CreateTime": string_from_nt_time_string,
"msKds-RootKeyData": bytes.hex,
"msKds-SecretAgreementParam": bytes.hex,
"objectGUID": guid_to_string,
"msKds-KDFParam": bytes.hex,
"msKds-PublicKeyLength": int,
"msKds-PrivateKeyLength": int,
"msKds-Version": int,
}
def encode_by_key(k, v):
"""Convert an attribute into a printable form, using the attribute
name to guess the best format."""
fn = ENCODERS.get(k, lambda x: str(x))
if not isinstance(v, MessageElement): # probably Dn
return fn(v)
if len(v) == 1:
return fn(v[0])
return [fn(x) for x in v]
# these attributes we normally want to show. 'name' is a GUID string
# (and has the same value as cn, the rdn).
BASE_ATTRS = ["name",
"msKds-UseStartTime",
"msKds-CreateTime",
]
# these attributes are secret, and also pretty opaque and useless to
# look at (unless you want to steal the secret).
SECRET_ATTRS = ["msKds-RootKeyData",
"msKds-SecretAgreementParam"]
# these are things you might want to look at, but generally don't.
VERBOSE_ATTRS = ["whenCreated",
"whenChanged",
"objectGUID",
"msKds-KDFAlgorithmID",
"msKds-KDFParam",
"msKds-SecretAgreementAlgorithmID",
"msKds-PublicKeyLength",
"msKds-PrivateKeyLength",
"msKds-Version",
"msKds-DomainID",
"cn",
]
class RootKeyCommand(Command):
"""Base class with a common method for presenting root key data."""
def show_root_key_message(self, msg,
output_format=None,
show_secrets=False,
preamble=None,
now=None):
if output_format == 'json':
out = {}
if preamble is not None:
out['message'] = preamble
for k, v in msg.items():
if not show_secrets and k in SECRET_ATTRS:
continue
out[k] = encode_by_key(k, v)
self.print_json(out)
return
if now is None:
now = nt_now()
create_time = int(msg['msKds-createTime'][0])
start_time = int(msg['msKds-UseStartTime'][0])
create_delta_string = delta_string(create_time - now)
start_delta_string = delta_string(start_time - now)
if preamble is not None:
self.message(preamble)
self.message(f"name {msg['name']}")
self.message(f" created {string_from_nt_time(create_time)} ({create_delta_string})")
self.message(f" usable from {string_from_nt_time(start_time)} ({start_delta_string})")
if show_secrets:
for k in SECRET_ATTRS:
v = msg[k][0].hex()
self.message(f" {k:14} {v}")
remaining_keys = [k for k in msg if k not in BASE_ATTRS + SECRET_ATTRS]
for k in remaining_keys:
v = encode_by_key(k, msg[k])
self.message(f" {k:14} {v}")
self.message('')
class cmd_domain_kds_root_key_create(RootKeyCommand):
"""Create a KDS root key object."""
synopsis = "%prog [-H ] [options]"
takes_optiongroups = {
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"hostopts": options.HostOptions,
}
takes_options = [
Option("--json", help="Output results in JSON format.",
dest="output_format", action="store_const", const="json"),
Option("--use-start-time", help="Use of the key begins at this time."),
Option("-v", "--verbose", help="Be verbose", action="store_true"),
]
def run(self, hostopts=None, sambaopts=None, credopts=None,
output_format=None, use_start_time=None, verbose=None):
kwargs = {}
if use_start_time is not None:
try:
nt_use = nt_time_from_string(use_start_time)
kwargs['use_start_time'] = nt_use
except ValueError as e:
raise CommandError(e) from None
ldb = self.ldb_connect(hostopts, sambaopts, credopts)
dn = ldb.new_gkdi_root_key(**kwargs)
guid = dn.get_rdn_value()
attrs = BASE_ATTRS[:]
if verbose:
attrs += VERBOSE_ATTRS
msg = get_root_key_by_name_or_dn(ldb, guid, attrs=attrs)
start_time = int(msg['msKds-UseStartTime'][0])
used_from_string = (f"usable from {string_from_nt_time(start_time)} "
f"({delta_string(start_time - nt_now())})")
message = f"created root key {guid}, {used_from_string}"
if verbose:
self.show_root_key_message(msg,
output_format,
preamble=f"{message}\n")
elif output_format == 'json':
kwargs = {k: msg[k] for k in attrs}
self.print_json_status(message=message, dn=str(dn), **kwargs)
else:
self.message(message)
class cmd_domain_kds_root_key_delete(RootKeyCommand):
"""Delete a KDS root key."""
synopsis = "%prog [-H ] [options]"
takes_optiongroups = {
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"hostopts": options.HostOptions,
}
takes_options = [
Option("--name", help="The key to delete"),
Option("--json", help="Output results in JSON format.",
dest="output_format", action="store_const", const="json"),
]
def run(self, hostopts=None, sambaopts=None, credopts=None, name=None, output_format=None):
ldb = self.ldb_connect(hostopts, sambaopts, credopts)
try:
root_key = get_root_key_by_name_or_dn(ldb, name)
except LdbError as e:
raise CommandError(e)
ldb.delete(root_key.dn)
guid = root_key.dn.get_rdn_value()
message = f"deleted root key {guid}"
if output_format == 'json':
self.print_json_status(message)
else:
self.message(message)
class cmd_domain_kds_root_key_list(RootKeyCommand):
"""List KDS root keys."""
synopsis = "%prog [-H ] [options]"
takes_optiongroups = {
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"hostopts": options.HostOptions,
}
takes_options = [
Option("-v", "--verbose", help="Be verbose", action="store_true"),
Option("--show-secrets", help="Show root key hash", action="store_true"),
Option("--json", help="Output results in JSON format.",
dest="output_format", action="store_const", const="json"),
]
def run(self, hostopts=None, sambaopts=None, credopts=None, verbose=None,
show_secrets=None, output_format=None):
ldb = self.ldb_connect(hostopts, sambaopts, credopts)
attrs = BASE_ATTRS[:]
if show_secrets:
attrs += SECRET_ATTRS
if verbose:
attrs += VERBOSE_ATTRS
res = get_sorted_root_keys(ldb, attrs)
if output_format == 'json':
out = []
for msg in res.msgs:
m = {}
out.append(m)
for k, v in msg.items():
m[k] = encode_by_key(k, v)
self.print_json(out)
return
if len(res) == 0:
self.message("no root keys found.")
return
self.message(f"{len(res)} root key{'s' if len(res) > 1 else ''} found.\n")
now = nt_now()
for msg in res:
self.show_root_key_message(msg,
output_format,
show_secrets=show_secrets,
now=now)
self.message('')
class cmd_domain_kds_root_key_view(RootKeyCommand):
"""View a root key object."""
synopsis = "%prog [-H ] [options]"
takes_optiongroups = {
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"hostopts": options.HostOptions,
}
takes_options = [
Option("--name", help="Choose thhe key to view (by GUID)"),
Option("--latest", help="View the latest key", action="store_true"),
Option("-v", "--verbose", help="Be verbose", action="store_true"),
Option("--show-secrets", help="Show root key hash", action="store_true"),
Option("--json", help="Output results in JSON format.",
dest="output_format", action="store_const", const="json"),
]
def run(self, hostopts=None, sambaopts=None, credopts=None,
name=None, output_format=None, show_secrets=None, verbose=None,
latest=None):
ldb = self.ldb_connect(hostopts, sambaopts, credopts)
# The default behaviour is to show quite a lot of information,
# equal to that seen with `list --verbose`, but leaving out
# uninteresting attributes like "showInAdvancedViewOnly" and
# tautological ones like "objectClass".
#
# -> selected attributes
# --show-secrets -> selected attributes and secrets
# --verbose -> all attributes EXCEPT secrets
# --verbose --show-secrets -> all attributes
attrs = BASE_ATTRS + VERBOSE_ATTRS
if show_secrets:
attrs += SECRET_ATTRS
if verbose:
attrs += ["*"]
if latest:
if name is not None:
raise CommandError("It makes no sense to combine --name and --latest")
res = get_sorted_root_keys(ldb, attrs)
if len(res) == 0:
raise CommandError("no root keys found")
msg = res[0]
elif name is not None:
msg = get_root_key_by_name_or_dn(ldb, name, attrs)
else:
raise CommandError("PLease use '--name ' or '--latest' "
" (try the 'list' command to find names)")
self.show_root_key_message(msg,
output_format,
show_secrets=show_secrets)
class cmd_domain_kds_root_key(SuperCommand):
"""Manage key distribution service root keys."""
subcommands = {
"create": cmd_domain_kds_root_key_create(),
"delete": cmd_domain_kds_root_key_delete(),
"list": cmd_domain_kds_root_key_list(),
"view": cmd_domain_kds_root_key_view(),
}