Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 112 additions & 17 deletions src/DynamicData.Tests/Cache/DeadlockTortureTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ public sealed class DeadlockTortureTest
{
private const int ItemCount = 200;
private const int Iterations = 50;
private const int TimeoutSeconds = 15;
private const int TimeoutSeconds = 60;

private static async Task<bool> RunBidirectionalDeadlockTest(
Func<IObservable<IChangeSet<Person, string>>, IObservable<IChangeSet<Person, string>>> pipeline,
Action? subjectPusher = null,
int iterations = Iterations)
{
for (var iter = 0; iter < iterations; iter++)
Expand All @@ -44,11 +45,13 @@ private static async Task<bool> RunBidirectionalDeadlockTest(
using var aToB = pipeline(sourceA.Connect().Filter(x => x.Name.StartsWith("A"))).PopulateInto(sourceB);
using var bToA = pipeline(sourceB.Connect().Filter(x => x.Name.StartsWith("B"))).PopulateInto(sourceA);

using var barrier = new Barrier(2);
var participants = subjectPusher is null ? 2 : 3;
using var barrier = new Barrier(participants);
var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); });
var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); });
var taskC = subjectPusher is null ? null : Task.Run(() => { barrier.SignalAndWait(); subjectPusher(); });

var completed = Task.WhenAll(taskA, taskB);
var completed = taskC is null ? Task.WhenAll(taskA, taskB) : Task.WhenAll(taskA, taskB, taskC);
if (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds))) != completed)
return false;
}
Expand All @@ -64,26 +67,84 @@ [Fact] public async Task AutoRefresh_DoesNotDeadlock() =>
[Fact] public async Task GroupOn_DoesNotDeadlock() =>
(await RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()))).Should().BeTrue();

[Fact] public async Task GroupWithImmutableState_DoesNotDeadlock() =>
(await RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey))).Should().BeTrue();

[Fact] public async Task Page_DoesNotDeadlock()
{
using var req = new BehaviorSubject<IPageRequest>(new PageRequest(1, 50));
(await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Page(req))).Should().BeTrue();
(await RunBidirectionalDeadlockTest(
s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Page(req),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); })).Should().BeTrue();
}

[Fact] public async Task SortAndPage_DoesNotDeadlock()
{
using var req = new BehaviorSubject<IPageRequest>(new PageRequest(1, 50));
(await RunBidirectionalDeadlockTest(
s => s.SortAndPage(SortExpressionComparer<Person>.Ascending(p => p.Age), req),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); })).Should().BeTrue();
}

[Fact] public async Task Virtualise_DoesNotDeadlock()
{
using var req = new BehaviorSubject<IVirtualRequest>(new VirtualRequest(0, 50));
(await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Virtualise(req))).Should().BeTrue();
(await RunBidirectionalDeadlockTest(
s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Virtualise(req),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); })).Should().BeTrue();
Comment thread
dwcullop marked this conversation as resolved.
}

[Fact] public async Task SortAndVirtualize_DoesNotDeadlock()
{
using var req = new BehaviorSubject<IVirtualRequest>(new VirtualRequest(0, 50));
(await RunBidirectionalDeadlockTest(
s => s.SortAndVirtualize(SortExpressionComparer<Person>.Ascending(p => p.Age), req),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); })).Should().BeTrue();
}

[Fact] public async Task QueryWhenChanged_DoesNotDeadlock()
{
for (var iter = 0; iter < Iterations; iter++)
{
using var sourceA = new SourceCache<Person, string>(p => p.UniqueKey);
using var sourceB = new SourceCache<Person, string>(p => p.UniqueKey);

// QueryWhenChanged with an itemChangedTrigger exercises the Merge branch.
// A side-channel write into the other cache closes the same ABBA cycle that
// PopulateInto would close for changeset-shaped operators.
using var aToB = sourceA.Connect()
.Filter(p => p.Name.StartsWith("A"))
.QueryWhenChanged(p => p.WhenPropertyChanged(x => x.Age))
.Subscribe(_ => sourceB.AddOrUpdate(new Person("A-marker", 0)));
using var bToA = sourceB.Connect()
.Filter(p => p.Name.StartsWith("B"))
.QueryWhenChanged(p => p.WhenPropertyChanged(x => x.Age))
.Subscribe(_ => sourceA.AddOrUpdate(new Person("B-marker", 0)));

using var barrier = new Barrier(2);
var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); });
var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); });

var completed = Task.WhenAll(taskA, taskB);
(await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)))).Should().BeSameAs(completed, "iteration " + iter);
}
}

[Fact] public async Task TransformWithForce_DoesNotDeadlock()
{
using var force = new Subject<Func<Person, string, bool>>();
(await RunBidirectionalDeadlockTest(s => s.Transform((p, k) => new Person("T-" + p.Name, p.Age), force))).Should().BeTrue();
(await RunBidirectionalDeadlockTest(
s => s.Transform((p, k) => new Person("T-" + p.Name, p.Age), force),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) force.OnNext(static (p, _) => true); })).Should().BeTrue();
}

[Fact] public async Task BatchIf_DoesNotDeadlock() =>
(await RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject<bool>(false), false, (TimeSpan?)null))).Should().BeTrue();
[Fact] public async Task BatchIf_DoesNotDeadlock()
{
using var pause = new BehaviorSubject<bool>(false);
(await RunBidirectionalDeadlockTest(
s => s.BatchIf(pause, false, (TimeSpan?)null),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pause.OnNext(j % 2 == 0); })).Should().BeTrue();
}

[Fact] public async Task DisposeMany_DoesNotDeadlock() =>
(await RunBidirectionalDeadlockTest(s => s.DisposeMany())).Should().BeTrue();
Expand All @@ -94,31 +155,65 @@ [Fact] public async Task OnItemRemoved_DoesNotDeadlock() =>
[Fact] public async Task AllDangerous_Stacked_DoNotDeadlock()
{
using var pageReq = new BehaviorSubject<IPageRequest>(new PageRequest(1, 100));
using var virtReq = new BehaviorSubject<IVirtualRequest>(new VirtualRequest(0, 100));
using var force = new Subject<Func<Person, string, bool>>();
(await RunBidirectionalDeadlockTest(
s => s.AutoRefresh(p => p.Age)
s => s.GroupWithImmutableState(p => p.Age % 3)
.TransformMany(g => g.Items, p => p.UniqueKey)
.AutoRefresh(p => p.Age)
.Filter(p => p.Age >= 0)
.Transform((p, k) => new Person("X-" + p.Name, p.Age), force)
.OnItemRemoved(_ => { })
.DisposeMany()
.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age))
.Virtualise(virtReq)
.Page(pageReq),
subjectPusher: () =>
{
for (var j = 0; j < ItemCount; j++)
{
force.OnNext(static (p, _) => true);
pageReq.OnNext(new PageRequest(1 + (j % 4), 50 + (j % 4) * 50));
virtReq.OnNext(new VirtualRequest(j * 5, 50 + (j % 4) * 50));
}
},
iterations: Iterations * 2)).Should().BeTrue();
}

[Fact] public async Task MultiplePairs_Simultaneous_NoDeadlock()
{
using var pageReq = new BehaviorSubject<IPageRequest>(new PageRequest(1, 50));
using var pageReq2 = new BehaviorSubject<IPageRequest>(new PageRequest(1, 50));
using var virtReq = new BehaviorSubject<IVirtualRequest>(new VirtualRequest(0, 50));
using var virtReq2 = new BehaviorSubject<IVirtualRequest>(new VirtualRequest(0, 50));
using var pause = new BehaviorSubject<bool>(false);
var results = await Task.WhenAll(
RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)), 30),
RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), 30),
RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), 30),
RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), 30),
RunBidirectionalDeadlockTest(s => s.DisposeMany(), 30),
RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Page(pageReq), 30),
RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Virtualise(virtReq), 30),
RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject<bool>(false), false, (TimeSpan?)null), 30));
RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)), iterations: 30),
RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), iterations: 30),
RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), iterations: 30),
RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey), iterations: 30),
RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), iterations: 30),
RunBidirectionalDeadlockTest(s => s.DisposeMany(), iterations: 30),
RunBidirectionalDeadlockTest(
s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Page(pageReq),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pageReq.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); },
iterations: 30),
RunBidirectionalDeadlockTest(
s => s.SortAndPage(SortExpressionComparer<Person>.Ascending(p => p.Age), pageReq2),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pageReq2.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); },
iterations: 30),
RunBidirectionalDeadlockTest(
s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Virtualise(virtReq),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) virtReq.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); },
iterations: 30),
RunBidirectionalDeadlockTest(
s => s.SortAndVirtualize(SortExpressionComparer<Person>.Ascending(p => p.Age), virtReq2),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) virtReq2.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); },
iterations: 30),
RunBidirectionalDeadlockTest(
s => s.BatchIf(pause, false, (TimeSpan?)null),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pause.OnNext(j % 2 == 0); },
iterations: 30));
results.Should().AllSatisfy(r => r.Should().BeTrue());
}

Expand Down
Loading
Loading