# Unix SMB/CIFS implementation.
# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import print_function
"""Tests for the Auth and AuthZ logging.
"""
import samba.tests
from samba.dcerpc import srvsvc, dnsserver
import os
from samba.samba3 import libsmb_samba_internal as libsmb
from samba.samba3 import param as s3param
from samba.samdb import SamDB
import samba.tests.auth_log_base
from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
from samba import NTSTATUSError
from subprocess import call
from ldb import LdbError
from samba.dcerpc.windows_event_ids import (
EVT_ID_SUCCESSFUL_LOGON,
EVT_ID_UNSUCCESSFUL_LOGON,
EVT_LOGON_NETWORK,
EVT_LOGON_INTERACTIVE,
EVT_LOGON_NETWORK_CLEAR_TEXT
)
import re
class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
def setUp(self):
super(AuthLogTests, self).setUp()
self.remoteAddress = os.environ["CLIENT_IP"]
def tearDown(self):
super(AuthLogTests, self).tearDown()
def smb_connection(self, creds, use_spnego="yes", ntlmv2_auth="yes",
force_smb1=False):
# the SMB bindings rely on having a s3 loadparm
lp = self.get_loadparm()
s3_lp = s3param.get_context()
s3_lp.load(lp.configfile)
# Allow the testcase to skip SPNEGO or use NTLMv1
s3_lp.set("client use spnego", use_spnego)
s3_lp.set("client ntlmv2 auth", ntlmv2_auth)
return libsmb.Conn(self.server, "sysvol", lp=s3_lp, creds=creds,
force_smb1=force_smb1)
def _test_rpc_ncacn_np(self, authTypes, creds, service,
binding, protection, checkFunction):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authorization" and
(msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
msg["Authorization"]["serviceDescription"] == service) and
msg["Authorization"]["authType"] == authTypes[0] and
msg["Authorization"]["transportProtection"] == protection)
if binding:
binding = "[%s]" % binding
if service == "dnsserver":
x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
self.get_loadparm(),
creds)
elif service == "srvsvc":
x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
self.get_loadparm(),
creds)
# The connection is passed to ensure the server
# messaging context stays up until all the messages have been received.
messages = self.waitForMessages(isLastExpectedMessage, x)
checkFunction(messages, authTypes, service, binding, protection)
def _assert_ncacn_np_serviceDescription(self, binding, serviceDescription):
# Turn "[foo,bar]" into a list ("foo", "bar") to test
# lambda x: x removes anything that evaluates to False,
# including empty strings, so we handle "" as well
binding_list = \
list(filter(lambda x: x, re.compile('[\[,\]]').split(binding)))
# Handle explicit smb2, smb1 or auto negotiation
if "smb2" in binding_list:
self.assertEquals(serviceDescription, "SMB2")
elif "smb1" in binding_list:
self.assertEquals(serviceDescription, "SMB")
else:
self.assertIn(serviceDescription, ["SMB", "SMB2"])
def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
binding, protection):
expected_messages = len(authTypes)
self.assertEquals(expected_messages,
len(messages),
"Did not receive the expected number of messages")
# Check the first message it should be an Authentication
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals(
EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
self.assertEquals(
EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
self._assert_ncacn_np_serviceDescription(
binding, msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[1],
msg["Authentication"]["authDescription"])
# Check the second message it should be an Authorization
msg = messages[1]
self.assertEquals("Authorization", msg["type"])
self._assert_ncacn_np_serviceDescription(
binding, msg["Authorization"]["serviceDescription"])
self.assertEquals(authTypes[2], msg["Authorization"]["authType"])
self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
# Check the third message it should be an Authentication
# if we are expecting 4 messages
if expected_messages == 4:
def checkServiceDescription(desc):
return (desc == "DCE/RPC" or desc == service)
msg = messages[2]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertTrue(
checkServiceDescription(
msg["Authentication"]["serviceDescription"]))
self.assertEquals(authTypes[3],
msg["Authentication"]["authDescription"])
self.assertEquals(
EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
self.assertEquals(
EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
def rpc_ncacn_np_krb5_check(
self,
messages,
authTypes,
service,
binding,
protection):
expected_messages = len(authTypes)
self.assertEquals(expected_messages,
len(messages),
"Did not receive the expected number of messages")
# Check the first message it should be an Authentication
# This is almost certainly Authentication over UDP, and is probably
# returning message too big,
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[1],
msg["Authentication"]["authDescription"])
self.assertEquals(
EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
self.assertEquals(
EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
# Check the second message it should be an Authentication
# This this the TCP Authentication in response to the message too big
# response to the UDP Authentication
msg = messages[1]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
msg["Aut
|