summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDouglas Bagnall <douglas.bagnall@catalyst.net.nz>2022-05-26 17:19:51 +1200
committerAndrew Bartlett <abartlet@samba.org>2022-06-17 01:28:30 +0000
commitc668b5caa92642fcb8feff423cee4ca67672dac6 (patch)
treee5bada1474853de71223aef64a290bc199289a34 /python
parent66cabb8fd1c756794f99f3cb33a7015d36b5ff03 (diff)
downloadsamba-c668b5caa92642fcb8feff423cee4ca67672dac6.tar.gz
samba-c668b5caa92642fcb8feff423cee4ca67672dac6.tar.bz2
samba-c668b5caa92642fcb8feff423cee4ca67672dac6.zip
tests: test source4 cmdline/smb.conf log level
The 'log level' line in smb.conf allows messages from different log classes to be sent to different places, but we have not tested that this works. Now we do, somewhat. The test involves running a special binary based on a stripped down source4/samba/server.c that just starts up, parses the command line and a given smb.conf, then logs messages from multiple classes and exits. Signed-off-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'python')
-rw-r--r--python/samba/tests/logfiles.py373
1 files changed, 373 insertions, 0 deletions
diff --git a/python/samba/tests/logfiles.py b/python/samba/tests/logfiles.py
new file mode 100644
index 00000000000..ac3487271b6
--- /dev/null
+++ b/python/samba/tests/logfiles.py
@@ -0,0 +1,373 @@
+# Unix SMB/CIFS implementation.
+#
+# Copyright (C) Catalyst.Net Ltd. 2022
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import subprocess
+import os
+from samba.tests import TestCaseInTempDir
+from pprint import pprint
+
+HERE = os.path.dirname(__file__)
+SERVER = os.path.join(HERE, '../../../../bin/test_s4_logging')
+
+CLASS_LIST = ["all", "tdb", "printdrivers", "lanman", "smb",
+ "rpc_parse", "rpc_srv", "rpc_cli", "passdb", "sam", "auth",
+ "winbind", "vfs", "idmap", "quota", "acls", "locking", "msdfs",
+ "dmapi", "registry", "scavenger", "dns", "ldb", "tevent",
+ "auth_audit", "auth_json_audit", "kerberos", "drs_repl",
+ "smb2", "smb2_credits", "dsdb_audit", "dsdb_json_audit",
+ "dsdb_password_audit", "dsdb_password_json_audit",
+ "dsdb_transaction_audit", "dsdb_transaction_json_audit",
+ "dsdb_group_audit", "dsdb_group_json_audit"]
+
+
+CLASS_CODES = {k: i for i, k in enumerate(CLASS_LIST)}
+
+
+class S4LoggingTests(TestCaseInTempDir):
+
+ def _write_smb_conf(self,
+ default_level=2,
+ default_file="default",
+ mapping=()):
+ self.smbconf = os.path.join(self.tempdir, "smb.conf")
+
+ with open(self.smbconf, "w") as f:
+ f.write('[global]\n')
+ if default_file is not None:
+ dest = os.path.join(self.tempdir,
+ default_file)
+ f.write(f" log file = {dest}\n")
+
+ f.write(" log level = ")
+ if default_level:
+ f.write(f"{default_level}")
+
+ for dbg_class, log_level, log_file in mapping:
+ f.write(' ')
+ f.write(dbg_class)
+ if log_level is not None:
+ f.write(f':{log_level}')
+ if log_file is not None:
+ dest = os.path.join(self.tempdir,
+ log_file)
+
+ f.write(f'@{dest}')
+ f.write('\n')
+ self.addCleanup(os.unlink, self.smbconf)
+
+ def _extract_log_level_line(self, new_level=2):
+ # extricate the 'log level' line from the smb.conf, returning
+ # the value, and replacing the log level line with something
+ # innocuous.
+ smbconf2 = self.smbconf + 'new'
+ with open(self.smbconf) as f:
+ with open(smbconf2, 'w') as f2:
+ for line in f:
+ if 'log level' in line:
+ debug_arg = line.split('=', 1)[1].strip()
+ if new_level is not None:
+ f2.write(f' log level = {new_level}\n')
+ else:
+ f2.write(line)
+ os.replace(smbconf2, self.smbconf)
+ return debug_arg
+
+ def _get_expected_strings(self, mapping,
+ level_filter,
+ default_file='default',
+ file_filter=None):
+ default = os.path.join(self.tempdir, default_file)
+ expected = {default: []}
+ # this kind of thing:
+ # " logging for 'dns' [21], at level 4"
+ for dbg_class, log_level, log_file in mapping:
+ if log_file is None:
+ log_file = default_file
+
+ f = os.path.join(self.tempdir, log_file)
+ expected.setdefault(f, [])
+ if log_level < level_filter:
+ continue
+ if file_filter not in (None, log_file):
+ continue
+ s = (f" logging for '{dbg_class}' [{CLASS_CODES[dbg_class]}], "
+ f"at level {level_filter}")
+ expected[f].append(s)
+
+ return expected
+
+ def _run_s4_logger(self, log_level, *extra_args):
+ cmd = [SERVER,
+ '-s', self.smbconf,
+ '-L', str(log_level),
+ *extra_args]
+
+ p = subprocess.run(cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ self.assertEqual(p.returncode, 0,
+ f"'{' '.join(cmd)}' failed ({p.returncode})")
+
+ return p.stdout.decode(), p.stderr.decode()
+
+ def assert_string_contains(self, string, expected_lines,
+ filename=None):
+ expected_lines = set(expected_lines)
+ string_lines = set(string.split('\n'))
+ present_lines = string_lines & expected_lines
+ if present_lines != expected_lines:
+ if filename:
+ print(filename)
+ print("expected %d lines, found %d" %
+ (len(expected_lines), len(present_lines)))
+ print("missing lines:")
+ pprint(expected_lines - present_lines)
+ raise AssertionError("missing lines")
+
+ def assert_file_contains(self, filename, expected_lines):
+ with open(filename) as f:
+ string = f.read()
+ self.assert_string_contains(string, expected_lines, filename)
+
+ def assert_n_known_lines_string(self, string, n):
+ count = string.count("logging for '")
+ if count != n:
+ raise AssertionError(
+ f"string has {count} lines, expected {n}")
+
+ def assert_n_known_lines(self, filename, n):
+ with open(filename) as f:
+ string = f.read()
+ count = string.count(" logging for '")
+ if count != n:
+ raise AssertionError(
+ f"{filename} has {count} lines, expected {n}")
+
+ def assert_unlink_expected_strings(self, expected_strings):
+ for k, v in expected_strings.items():
+ if not os.path.exists(k):
+ self.fail(f"{k} does not exist")
+ self.assert_file_contains(k, v)
+ self.assert_n_known_lines(k, len(v))
+ os.unlink(k)
+
+ def test_each_to_its_own(self):
+ level = 4
+ mapping = [(x, level, x) for x in CLASS_LIST]
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_all_to_one(self):
+ level = 4
+ dest = 'everything'
+ mapping = [(x, level, dest) for x in CLASS_LIST]
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_bifurcate(self):
+ level = 4
+ dests = ['even', 'odd']
+ mapping = [(x, level + 1, dests[i & 1])
+ for i, x in enumerate(CLASS_LIST)]
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_bifurcate_level_out_of_range(self):
+ # nothing will be logged, because we're logging at a too high
+ # level.
+ level = 4
+ dests = ['even', 'odd']
+ mapping = [(x, level - 1, dests[i & 1])
+ for i, x in enumerate(CLASS_LIST)]
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_bifurcate_misc_log_level(self):
+ # We are sending even numbers to default and odd numbers to
+ # 'odd', at various levels, depending on mod 3. Like this:
+ #
+ # log level = 2 all:5 \
+ # tdb:4@odd \
+ # printdrivers:3 \
+ # lanman:5@odd \
+ # smb:4 \
+ # rpc_parse:3@odd \
+ # rpc_srv:5 ...
+ #
+ # Therefore, 'default' should get classes that are (0 or 4) % 6
+ # and 'odd' should get classes that are (1 or 3) % 6.
+
+ level = 4
+ dests = [None, 'odd']
+ mapping = []
+ for i, x in enumerate(CLASS_LIST):
+ parity = i & 1
+ log_level = level + 1 - (i % 3)
+ mapping.append((x, log_level, dests[parity]))
+
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_all_different_ways_cmdline_d(self):
+ level = 4
+ dests = [None, 'a', 'b', 'c']
+ mapping = []
+ seed = 123
+ for i, x in enumerate(CLASS_LIST):
+ d = seed & 3
+ seed = seed * 17 + 1
+ log_level = seed % 10
+ seed &= 0xff
+ mapping.append((x, log_level, dests[d]))
+
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ debug_arg = self._extract_log_level_line(26)
+
+ stdout, stderr = self._run_s4_logger(level, '-d', debug_arg)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_all_different_ways_cmdline_d_interactive(self):
+ level = 4
+ dests = [None, 'a', 'b', 'c']
+ mapping = []
+ seed = 1234
+ for i, x in enumerate(CLASS_LIST):
+ d = seed & 3
+ seed = seed * 13 + 1
+ log_level = seed % 10
+ seed &= 0xff
+ mapping.append((x, log_level, dests[d]))
+
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ debug_arg = self._extract_log_level_line(None)
+ stdout, stderr = self._run_s4_logger(level, '-d', debug_arg, '-i')
+ expected_lines = []
+ for v in expected_strings.values():
+ # stderr doesn't end up with leading ' '
+ expected_lines.extend([x.strip() for x in v])
+
+ self.assert_string_contains(stderr, expected_lines)
+ self.assert_n_known_lines_string(stderr, len(expected_lines))
+
+ def test_only_some_level_0(self):
+ # running the logger with -L 0 makes the log messages run at
+ # level 0 (i.e DBG_ERR), so we always see them in default,
+ # even though smb.conf doesn't ask.
+ mapping = [(x, 3, ['default', 'bees']['b' in x])
+ for x in CLASS_LIST]
+ expected_strings = self._get_expected_strings(mapping, 0)
+ self._write_smb_conf(mapping=[x for x in mapping if x[2] == 'bees'])
+ stdout, stderr = self._run_s4_logger(0)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_only_some_level_3(self):
+ # here, we're expecting the unmentioned non-b classes to just
+ # disappear.
+ level = 3
+ mapping = [(x, level, 'bees') for x in CLASS_LIST if 'b' in x]
+ expected_strings = self._get_expected_strings(mapping, level)
+ self._write_smb_conf(mapping=[x for x in mapping if x[2] == 'bees'])
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_none(self):
+ level = 4
+ mapping = []
+ expected_strings = self._get_expected_strings(mapping, level)
+ self._write_smb_conf(mapping=mapping)
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_none_high_default(self):
+ # We set the default level to 5 and do nothing else special,
+ # which means we need a different mapping for the smb.conf
+ # than the expected strings.
+ level = 4
+ mapping = [(x, 5, 'default') for x in CLASS_LIST]
+ expected_strings = self._get_expected_strings(mapping, level)
+ # note the empty mapping in smb.conf
+ self._write_smb_conf(mapping=[], default_level=5)
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_none_high_cmdline_d(self):
+ # We set the default level to 2, but run the 'server' with -d 10.
+ level = 4
+ mapping = [(x, 10, 'default') for x in CLASS_LIST]
+ expected_strings = self._get_expected_strings(mapping, level)
+ # note the empty mapping in smb.conf
+ self._write_smb_conf(mapping=[])
+ stdout, stderr = self._run_s4_logger(level, '-d', '10')
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_interactive_high_default_simple(self):
+ # running with -i should send everything to stderr.
+ level = 4
+ mapping = [(x, 5, 'default') for x in CLASS_LIST]
+ expected_strings = self._get_expected_strings(mapping, level)
+ self._write_smb_conf(mapping=[], default_level=5)
+ stdout, stderr = self._run_s4_logger(level, '-i')
+ expected_lines = []
+ for v in expected_strings.values():
+ # stderr doesn't end up with leading ' '
+ expected_lines.extend([x.strip() for x in v])
+
+ self.assert_string_contains(stderr, expected_lines)
+
+ def test_interactive_complex_smb_conf(self):
+ # running with -i should send everything to stderr. The
+ # smb.conf will set the levels, but the target files are
+ # overridden.
+ # (this is the test_bifurcate_misc_log_level() smb.conf).
+ level = 4
+ dests = [None, 'odd']
+ mapping = []
+ for i, x in enumerate(CLASS_LIST):
+ parity = i & 1
+ log_level = level + 1 - (i % 3)
+ mapping.append((x, log_level, dests[parity]))
+
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ stdout, stderr = self._run_s4_logger(level, '-i')
+ expected_lines = []
+ for v in expected_strings.values():
+ # stderr doesn't end up with leading ' '
+ expected_lines.extend([x.strip() for x in v])
+
+ self.assert_string_contains(stderr, expected_lines)