1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
|
# Unix SMB/CIFS implementation.
# Copyright (C) Catalyst.Net Ltd 2025
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
"""Functions for processing key_credential_link"""
import base64
from hashlib import sha256
import struct
import time
from typing import Optional, Union, Iterable
from cryptography.hazmat.primitives.serialization import (
load_der_public_key,
load_pem_public_key,
PublicFormat,
Encoding)
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
from cryptography.x509 import (
load_pem_x509_certificate,
load_der_x509_certificate)
from samba import nttime2unix
from samba.samdb import SamDB, BinaryDn
from samba.ndr import ndr_unpack, ndr_pack
from ldb import Dn
from samba.dcerpc import keycredlink, misc
class KeyCredLinkError(Exception):
"""The key credential link is inconsistent."""
# For bad values handed in, we use ValueError. For internal bad
# values, we use this.
def key_usage_string(i):
# there must be a better way.
for s in ('KEY_USAGE_NGC', 'KEY_USAGE_FIDO', 'KEY_USAGE_FEK',):
if i == getattr(keycredlink, s):
return s
return "unknown"
def nttime_as_date(nt):
secs = nttime2unix(nt)
ts = time.gmtime(secs)
return time.strftime('%Y-%m-%d %H:%M:%S', ts)
class KeyCredentialLinkDn(BinaryDn):
"""KeyCredentialLink attributes are stored as DN+Binary.
The binary part is a KEYCREDENTIALLINK_BLOB, which is basically an
array of KEYCREDENTIALLINK_ENTRY collectively describing a public
key.
Usually the DN refers to the object the KeyCredentialLink was
found on.
"""
# We make .binary a @property, so that BinaryDn's .parse() and
# .prefix just work without knowing that assigning to .binary is
# doing validation checks.
blob = None
@property
def binary(self) -> bytes:
"""The binary is stored as a keycredlink.KEYCREDENTIALLINK_BLOB"""
if self.blob is None:
return None
return ndr_pack(self.blob)
@binary.setter
def binary(self, value:bytes):
try:
self.blob = ndr_unpack(keycredlink.KEYCREDENTIALLINK_BLOB,
value)
except Exception as e:
raise ValueError("Could not parse value as KEYCREDENTIALLINK_BLOB "
f" (internal error: {e})")
def get_entry(self, entry_id):
if self.blob is None:
raise KeyCredLinkError("no key material")
for entry in self.blob.entries:
if entry.identifier == entry_id:
return entry.value
raise KeyCredLinkError(f"Key information entry {entry_id} not found")
def fingerprint(self) -> str:
"""The SHA256 of the key material in DER encoding, formatted
as hex pairs separated by colons ("hh:hh:...")"""
# A competing format is '2048 SHA256:<base64bytes>' (ssh style).
# This sha256 value should also be stored in the KeyID field.
data = self.get_entry(keycredlink.KeyMaterial)
hash = sha256(data).digest()
# Python 3.8+ will do this with hash.hex(':')
return ':'.join(f'{_:02X}' for _ in hash)
def description(self, verbosity=2) -> str:
"""Text describing key credential link characteristics.
verbosity is adjustable between 1 and 3.
"""
out = []
def write(msg, verbose_level=0):
if verbosity > verbose_level:
out.append(msg)
write(f'Link target: {self.dn}', 1)
write(f'Binary Dn: {self}', 2)
write(f'Key Credential Link Blob version: {self.blob.version}', 2)
write(f'Number of key entries: {self.blob.count}', 1)
write('Key entries:')
entries = []
longest = 0
for description, verbose_level, fn, attr in [
("key material fingerprint", 0,
lambda x: ':'.join(f"{_:02X}" for _ in x),
'KeyID'),
("key parameters fingerprint", 2,
lambda x: ':'.join(f"{_:02X}" for _ in x),
'KeyHash'),
("key usage", 1, key_usage_string, 'KeyUsage'),
("Device GUID", 1, misc.GUID, 'DeviceId'),
("last logon", 0, nttime_as_date,
'KeyApproximateLastLogonTimeStamp'),
("creation time", 0, nttime_as_date, 'KeyCreationTime'),
# for now we are ignoring KeySource and CustomKeyInformation
# KeyMaterial is decoded separately
]:
if verbosity > 1:
description = f"{description} ({attr})"
i = getattr(keycredlink, attr)
try:
entry = self.get_entry(i)
value = fn(entry)
except KeyCredLinkError:
value = "not found"
if verbosity > verbose_level:
entries.append((description, value))
longest = max(longest, len(description))
for desc, val in entries:
write(f" {desc + ':':{longest + 1}} {val}")
data = self.get_entry(keycredlink.KeyMaterial)
key = get_public_key(data, 'der')
write("RSA public key properties:", 1)
write(f" key size: {key.key_size}", 1)
write(f" fingerprint: {self.fingerprint()}", 1)
return '\n'.join(out)
def key_material(self) -> bytes:
return self.get_entry(keycredlink.KeyMaterial)
def as_pem(self) -> str:
"""Get the key out of the keycredlink blob, and return it in
PEM format as a string.
PEM is the ASCII format that starts '-----BEGIN PUBLIC KEY-----'.
"""
# The key is in DER format in an entry in the blob.
data = self.key_material()
key = get_public_key(data, 'der')
pem = key.public_bytes(Encoding.PEM,
PublicFormat.SubjectPublicKeyInfo)
return pem.decode()
def get_public_key(data:bytes, encoding:Optional[str] = None) -> RSAPublicKey:
"""decode a key in PEM or DER format.
If it turns out to be a certificate or something, we try to get
the public key from that.
So far only RSA keys are supported.
"""
if encoding is None:
if data[:11] == b'-----BEGIN ':
encoding = 'PEM'
else:
encoding = 'DER'
encoding = encoding.upper()
# The cryptography module also supports ssh keys, PKCS1, and other
# formats, as well as non-RSA keys and extracting public keys from
# private. It might not be wise to tolerate all of this, but we
# can do it by adding to key_fns and cert_fns here.
if encoding == 'PEM':
key_fns = [load_pem_public_key]
cert_fns = [load_pem_x509_certificate]
elif encoding == 'DER':
key_fns = [load_der_public_key]
cert_fns = [load_der_x509_certificate]
else:
raise ValueError(f"Public key encoding '{encoding}' not supported "
"(try 'PEM' or 'DER')")
key = None
for fn in key_fns:
try:
key = fn(data)
break
except ValueError:
continue
if key is None:
for fn in cert_fns:
try:
cert = fn(data)
key = cert.public_key()
break
except ValueError:
continue
if key is None:
raise ValueError("could not decode public key")
if not isinstance(key, RSAPublicKey):
raise ValueError("Currently only RSA Public Keys are supported "
f"(not '{key}')")
return key
def kcl_entry_bytes(entry_type:int, data:bytes) -> bytes:
"""helper to pack key credential link entries"""
return struct.pack('<HB', len(data), entry_type) + data
def create_key_credential_link(samdb: SamDB,
target: Union[str, Dn],
data: bytes,
encoding: Optional[str] = None,
force: bool = False):
"""Convert a public key in a common format into a binary DN"""
if not force:
res = samdb.search(base=target)
if len(res) == 0:
raise ValueError(f"link target {target} does not exist")
if encoding == 'auto':
encoding = None
key = get_public_key(data, encoding)
if key.key_size != 2048:
# According to [MS-ADTS] 2.2.20.5.1, KEY_USAGE_NGC means a
# 2048 bit public key.
if not force:
raise ValueError(f"2048 bit RSA key expected, not {key.key_size}")
key_bytes = key.public_bytes(Encoding.DER,
PublicFormat.SubjectPublicKeyInfo)
# that's the key.
# but there's more.
kcl_header = bytes.fromhex("00 02 00 00") # Always version 2
# Entries are added in the enum order, as follows.
#
# Here '**' means MUST exist, '*' means SHOULD, and '-' means
# SHOULD which we ignore. We ignore all the un-SHOULDed values
# ([MS-ADTS] 2.2.20.6). For KeyUsage, only use KEY_USAGE_NGC.
#
# ** 1 KeyID hash of the key material
# * 2 KeyHash hash of following entries (i.e. 3, 4, 9)
# ** 3 KeyMaterial the key
# ** 4 KeyUsage KEY_USAGE_NGC, KEY_USAGE_FIDO, or KEY_USAGE_FEK
# 5 KeySource KEY_SOURCE_AD.
# 6 DeviceId 16 byte device ID (GUID, I guess) or zeros
# 7 CustomKeyInformation CUSTOM_KEY_INFORMATION struct
# - 8 KeyApproximateLastLogonTimeStamp nttime
# * 9 KeyCreationTime nttime
# sha256 of the actual key
kcl_key_id = kcl_entry_bytes(keycredlink.KeyID,
sha256(key_bytes).digest())
# the actual key
kcl_material = kcl_entry_bytes(keycredlink.KeyMaterial,
key_bytes)
# always KEY_USAGE_NGC
kcl_key_usage = kcl_entry_bytes(keycredlink.KeyUsage,
keycredlink.KEY_USAGE_NGC.to_bytes(1, byteorder='big'))
# nttime for now
kcl_creation = kcl_entry_bytes(keycredlink.KeyCreationTime,
struct.pack('<Q', samdb.get_nttime()))
# always KEY_SOURCE_AD
#kcl_key_source = kcl_entry_bytes(keycredlink.KeySource,
# KEY_SOURCE_AD.to_bytes())
# the KeyHash field is a sha256 of all the values after the
# KeyHash field.
kcl_key_hash = kcl_entry_bytes(keycredlink.KeyHash,
sha256(kcl_material +
kcl_key_usage +
kcl_creation).digest())
kcl_bytes = (kcl_header +
kcl_key_id +
kcl_key_hash +
kcl_material +
kcl_key_usage +
kcl_creation)
k = KeyCredentialLinkDn.from_bytes_and_dn(samdb, kcl_bytes, target)
return k
def kcl_in_list(kcl: KeyCredentialLinkDn, others: Iterable[KeyCredentialLinkDn]):
"""True if kcl is in the list, otherwise False, disregarding
everything except key material and DN for the comparison.
"""
# this helps us avoid duplicate key credential links, which are
# otherwise disallowed only if all fields are identical, but which
# are generally useless.
km = kcl.key_material()
for other in others:
if km == other.key_material() and kcl.dn == other.dn:
return True
return False
def filter_kcl_list(samdb: SamDB,
keycredlinks: Iterable[KeyCredentialLinkDn],
link_target: Optional[str] = None,
fingerprint: Optional[str] = None) -> list:
"""Select only the input links that match at least one of the
criteria.
"""
# used in samba-tool X keytrust delete
selected = []
filters = []
if link_target is not None:
target_dn = Dn(samdb, link_target)
filters.append(lambda x: x.dn == target_dn)
if fingerprint is not None:
fingerprint = fingerprint.upper()
filters.append(lambda x: x.fingerprint() == fingerprint)
for x in keycredlinks:
for fn in filters:
if fn(x):
selected.append(x)
break
return selected
|