summaryrefslogtreecommitdiff
path: root/python/samba/tests/conditional_ace_assembler.py
blob: 486601bef3dfea780cffe671c4319cbf76b0a3ee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# Unix SMB/CIFS implementation.
# Copyright © Catalyst IT 2023
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
"""Fine-grained control over conditional ACE contents.

This deliberately allows you to do broken things that SDDL doesn't.

- token sequences that make no real sense
- sequences that make sense which SDDL can't encode
- strings that aren't proper utf-16
- etc.
"""

import struct
from samba.dcerpc import security, conditional_ace as ca
from samba.ndr import ndr_pack


class Composite:
    token = ca.CONDITIONAL_ACE_TOKEN_COMPOSITE

    def __init__(self, *tokens):
        self.members = []
        for t in tokens:
            self.members.append(dwim_one_token(t))

    def __bytes__(self):
        v = []
        for x in self.members:
            v.extend(bytes(x))

        return (bytes([self.token]) +
                struct.pack("<I", len(v)) +
                bytes(v))


class Int:
    def __init__(self, value,
                 bits=ca.CONDITIONAL_ACE_TOKEN_INT64,
                 base=ca.CONDITIONAL_ACE_INT_BASE_10,
                 sign=ca.CONDITIONAL_ACE_INT_SIGN_NONE):
        self.value = value
        self.bits = int(bits)
        self.base = int(base)
        self.sign = int(sign)

    def __bytes__(self):
        n = struct.pack('<q', self.value)
        return bytes([self.bits]) + n + bytes([self.sign, self.base])


class String:
    """A string is decoded as UTF-16.
    Other iterables allows the insertion of arbitrary raw bytes."""
    token = ca.CONDITIONAL_ACE_TOKEN_UNICODE

    def __init__(self, value):
        if isinstance(value, str):
            value = value.encode('utf-16-le')
        self.value = list(value)

    def __bytes__(self):
        header = struct.pack('<BI', self.token, len(self.value))
        return header + bytes(self.value)


class LocalAttr(String):
    token = ca.CONDITIONAL_ACE_LOCAL_ATTRIBUTE


class UserAttr(String):
    token = ca.CONDITIONAL_ACE_USER_ATTRIBUTE


class DeviceAttr(String):
    token = ca.CONDITIONAL_ACE_DEVICE_ATTRIBUTE


class ResourceAttr(String):
    token = ca.CONDITIONAL_ACE_RESOURCE_ATTRIBUTE


class ByteString:
    """takes an iterable of 8-bit numbers, or a string."""
    token = ca.CONDITIONAL_ACE_TOKEN_OCTET_STRING

    def __init__(self, value):
        if isinstance(value, str):
            value = value.encode()
        self.value = bytes(value)
        if max(self.value) > 255 or min(self.value) < 0:
            raise ValueError("bytes do need to be bytes (0-255)")

    def __bytes__(self):
        header = struct.pack('<BI', self.token, len(self.value))
        return header + self.value


class SID:
    token = ca.CONDITIONAL_ACE_TOKEN_SID

    def __init__(self, sidstring):
        self.sid = security.domsid(sidstring)

    def __bytes__(self):
        value = ndr_pack(self.sid)
        header = struct.pack('B<I', self.token, len(value))
        return header + value


class Token:
    """To add a raw byte, like
    Token(ca.CONDITIONAL_ACE_TOKEN_COMPOSITE)
    """
    def __init__(self, v):
        self.token = v

    def __bytes__(self):
        return bytes([self.token])


def _add_tokens():
    for tok in dir(ca):
        if not tok[:22] == 'CONDITIONAL_ACE_TOKEN_':
            continue
        k = tok[22:]
        globals()[k] = Token(getattr(ca, tok))

_add_tokens()


def dwim_one_token(t):
    if isinstance(t, int):
        return Int(t)
    if isinstance(t, str):
        return String(t)
    if isinstance(t, tuple):
        return Composite(*t)
    if isinstance(t, bytes):
        return ByteString(t)

    return t


def assemble(*tokens):
    program = b'artx'
    if len(tokens) == 1 and isinstance(tokens, (list, tuple, set)):
        print("WARNING: single argument container will become a composite. "
              "you might have meant 'assemble(*args)', not 'assemble(args)'")

    for t in tokens:
        t = dwim_one_token(t)
        program += bytes(t)

    program += b'\x00\x00\x00'
    program = program[:-(len(program) & 3)]

    return program


def assemble_ace(tokens=[],
                 type=security.SEC_ACE_TYPE_ACCESS_ALLOWED_CALLBACK,
                 trustee=None,
                 flags=None,
                 object=None,
                 access_mask=None):
    type_strings = {
        'XA': security.SEC_ACE_TYPE_ACCESS_ALLOWED_CALLBACK,
        'XD': security.SEC_ACE_TYPE_ACCESS_DENIED_CALLBACK,
        'ZA': security.SEC_ACE_TYPE_ACCESS_ALLOWED_CALLBACK_OBJECT,
        # this can also make plain ACEs
        'A': security.SEC_ACE_TYPE_ACCESS_ALLOWED,
        'D': security.SEC_ACE_TYPE_ACCESS_DENIED,
    }

    a = security.ace()
    a.type = type_strings.get(type, type)
    if trustee is not None:
        a.trustee = trustee
    if flags is not None:
        a.flags = flags
    if object is not None:
        a.object = object
    if tokens:
        a.coda = assemble(*tokens)
    return a


def assemble_sd(base_sddl='D:',
                add_trailing_allow_ace=False,
                domain_sid=None,
                **ace_args):
    """Make a security descriptor using the base_sddl, then add the
    assembled conditional ACE on the end of its DACL. If
    add_trailing_allow_ace is true, an allow ace matching
    '(A;;0x1ff;;;WD)' is added to the end, allowing successful deny
    ACEs to be detected.
    """
    if domain_sid is None:
        domain_sid = security.dom_sid('S-1-2-3-4')

    sd = security.descriptor.from_sddl(base_sddl, domain_sid)
    ace = assemble_ace(**ace_args)
    sd.dacl_add(ace)
    if add_trailing_allow_ace:
        # If the compiled ACE is a deny ACE, we won't know if it
        # worked unless there is a wide ranging allow ACE following
        # it.
        allow_ace = assemble_ace(type='A',
                                 trustee=security.dom_sid(security.SID_WORLD),
                                 access_mask=security.SEC_FILE_ALL)
        sd.dacl_add(allow_ace)

    return sd