#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Tests various schema replication scenarios
#
# Copyright (C) Catalyst.Net Ltd. 2017
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#
# Usage:
# export DC1=dc1_dns_name
# export DC2=dc2_dns_name
# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN \
# getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
#
import drs_base
import samba.tests
import ldb
from ldb import SCOPE_BASE
import random
from samba.dcerpc import drsuapi, misc
from samba import WERRORError
from samba import werror
class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
super(DrsReplicaSyncIntegrityTestCase, self).setUp()
self.init_test_state()
# Note that DC2 is the DC with the testenv-specific quirks (e.g. it's
# the vampire_dc), so we point this test directly at that DC
self.set_test_ldb_dc(self.ldb_dc2)
self.ou = str(samba.tests.create_test_ou(self.test_ldb_dc,
"getncchanges." + self.id().rsplit(".", 1)[1]))
self.addCleanup(self.ldb_dc2.delete, self.ou, ["tree_delete:1"])
self.base_dn = self.test_ldb_dc.get_default_basedn()
self.default_conn = DcConnection(self, self.ldb_dc2, self.dnsname_dc2)
self.set_dc_connection(self.default_conn)
def init_test_state(self):
self.rxd_dn_list = []
self.rxd_links = []
self.rxd_guids = []
self.last_ctr = None
# 100 is the minimum max_objects that Microsoft seems to honour
# (the max honoured is 400ish), so we use that in these tests
self.max_objects = 100
# store whether we used GET_TGT/GET_ANC flags in the requests
self.used_get_tgt = False
self.used_get_anc = False
def add_object(self, dn, objectclass="organizationalunit"):
"""Adds an OU object"""
self.test_ldb_dc.add({"dn": dn, "objectclass": objectclass})
res = self.test_ldb_dc.search(base=dn, scope=SCOPE_BASE)
self.assertEqual(len(res), 1)
def modify_object(self, dn, attr, value):
"""Modifies an object's USN by adding an attribute value to it"""
m = ldb.Message()
m.dn = ldb.Dn(self.test_ldb_dc, dn)
m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_ADD, attr)
self.test_ldb_dc.modify(m)
def delete_attribute(self, dn, attr, value):
"""Deletes an attribute from an object"""
m = ldb.Message()
m.dn = ldb.Dn(self.test_ldb_dc, dn)
m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_DELETE, attr)
self.test_ldb_dc.modify(m)
def start_new_repl_cycle(self):
"""Resets enough state info to start a new replication cycle"""
# reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know
# whether a parent/target is unknown and needs GET_ANC/GET_TGT to
# resolve
self.rxd_links = []
self.used_get_tgt = False
self.used_get_anc = False
# mostly preserve self.last_ctr, so that we use the last HWM
if self.last_ctr is not None:
self.last_ctr.more_data = True
def create_object_range(self, start, end, prefix="",
children=None, parent_list=None):
"""
Creates a block of objects. Object names are numbered sequentially,
using the optional prefix supplied. If the children parameter is
supplied it will create a parent-child hierarchy and return the
top-level parents separately.
"""
dn_list = []
# Use dummy/empty lists if we're not creating a parent/child hierarchy
if children is None:
children = []
if parent_list is None:
parent_list = []
# Create the parents first, then the children.
# This makes it easier to see in debug when GET_ANC takes effect
# because the parent/children become interleaved (by default,
# this approach means the objects are organized into blocks of
# parents and blocks of children together)
for x in range(start, end):
ou = "OU=test_ou_%s%d,%s" % (prefix, x, self.ou)
self.add_object(ou)
dn_list.append(ou)
# keep track of the top-level parents (if needed)
parent_list.append(ou)
# create the block of children (if needed)
for x in range(start, end):
for child in children:
ou = "OU=test_ou_child%s%d,%s" % (child, x, parent_list[x])
self.add_object(ou)
dn_list.append(ou)
return dn_list
def assert_expected_data(self, expected_list):
"""
Asserts that we received all the DNs that we expected and
none are missing.
"""
received_list = self.rxd_dn_list
# Note that with GET_ANC Windows can end up sending the same parent
# object multiple times, so this might be noteworthy but doesn't
# warrant failing the test
num_received = len(received_list)
num_expected = len(expected_list)
if num_received != num_expected:
print("Note: received %d objects but expected %d" % (num_received,
num_expected))
# Check that we received every object that we were expecting
for dn in expected_list:
self.assertTrue(dn in received_list,
"DN '%s' missing from replication." % dn)
def test_repl_integrity(self):
"""
Modify the objects being replicated while the replication is still
in progress and check that no object loss occurs.
"""
# The server behaviour differs between samba and Windows. Samba returns
# the objects in the original order (up to the pre-modify HWM). Windows
# incorporates the modified objects and returns them in the new order
# (i.e. modified objects last), up to the post-modify HWM. The
# Microsoft docs state the Windows behaviour is optional.
# Create a range of objects to replicate.
expected_dn_list = self.create_object_range(0, 400)
(orig_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
# We ask for the first page of 100 objects.
# For this test, we don't care what order we receive the objects in,
# so long as by the end we've received everything
self.repl_get_next()
# Modify some of the second page of objects. This should bump the
# highwatermark
for x in range(100, 200):
self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x)
(post_modify_hwm, _) = self._get_highest_hwm_utdv(self.test_ldb_dc)
self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn)
# Get the remaining blocks of data
while not self.replication_complete():
self.repl_get_next()
# Check we still receive all the objects we're expecting
self.assert_expected_data(expected_dn_list)
def is_parent_known(self, dn, known_dn_list):
"""
Returns True if the parent of the dn specified is in known_dn_list
"""
# we can sometimes get system objects like the RID Manager returned.
# Ignore anything that is not under the test OU we created
if self.ou not in dn:
return True
|