Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions src/DynamicData/List/ObservableListEx.Adapt.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Collections.ObjectModel;
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
using System.Linq.Expressions;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Binding;
using DynamicData.Cache.Internal;
using DynamicData.List.Internal;
using DynamicData.List.Linq;

// ReSharper disable once CheckNamespace
namespace DynamicData;

/// <summary>
/// Extensions for ObservableList.
/// </summary>
public static partial class ObservableListEx
{
/// <summary>
/// Injects a side effect into a changeset stream via an <see cref="IChangeSetAdaptor{T}"/>.
/// The adaptor's <c>Adapt</c> method is invoked for each changeset before it is forwarded downstream unchanged.
/// </summary>
/// <typeparam name="T">The type of items in the list.</typeparam>
/// <param name="source">The source <see cref="IObservable{IChangeSet{T}}"/> to observe and adapt.</param>

Check warning on line 31 in src/DynamicData/List/ObservableListEx.Adapt.cs

View workflow job for this annotation

GitHub Actions / build

Type parameter declaration must be an identifier not a type. See also error CS0081.

Check warning on line 31 in src/DynamicData/List/ObservableListEx.Adapt.cs

View workflow job for this annotation

GitHub Actions / build

Type parameter declaration must be an identifier not a type. See also error CS0081.

Check warning on line 31 in src/DynamicData/List/ObservableListEx.Adapt.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has syntactically incorrect cref attribute 'IObservable{IChangeSet{T}}'

Check warning on line 31 in src/DynamicData/List/ObservableListEx.Adapt.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has syntactically incorrect cref attribute 'IObservable{IChangeSet{T}}'
/// <param name="adaptor">The <see cref="IChangeSetAdaptor{T}"/> adaptor whose <c>Adapt</c> method is invoked for each changeset.</param>
/// <returns>A list changeset stream identical to the source, with the adaptor side effect applied.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="adaptor"/> is <see langword="null"/>.</exception>
/// <remarks>
/// <para>
/// This is the primary extension point for custom UI binding adaptors (e.g., <see cref="Bind{T}(IObservable{IChangeSet{T}}, IObservableCollection{T}, BindingOptions)"/>
/// delegates to this operator). If the adaptor throws, the exception propagates downstream as <c>OnError</c>.
/// </para>
/// </remarks>
/// <seealso cref="Bind{T}(IObservable{IChangeSet{T}}, IObservableCollection{T}, BindingOptions)"/>
public static IObservable<IChangeSet<T>> Adapt<T>(this IObservable<IChangeSet<T>> source, IChangeSetAdaptor<T> adaptor)
where T : notnull
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));
adaptor.ThrowArgumentNullExceptionIfNull(nameof(adaptor));

return Observable.Create<IChangeSet<T>>(
observer =>
{
var locker = InternalEx.NewLock();
return source.Synchronize(locker).Select(
changes =>
{
adaptor.Adapt(changes);
return changes;
}).SubscribeSafe(observer);
});
}
}
51 changes: 51 additions & 0 deletions src/DynamicData/List/ObservableListEx.AddKey.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Collections.ObjectModel;
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
using System.Linq.Expressions;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Binding;
using DynamicData.Cache.Internal;
using DynamicData.List.Internal;
using DynamicData.List.Linq;

// ReSharper disable once CheckNamespace
namespace DynamicData;

/// <summary>
/// Extensions for ObservableList.
/// </summary>
public static partial class ObservableListEx
{
/// <summary>
/// Adds a key to each item in a list changeset, converting it to a cache changeset that supports all keyed DynamicData operators.
/// </summary>
/// <typeparam name="TObject">The type of items in the list.</typeparam>
/// <typeparam name="TKey">The type of the key.</typeparam>
/// <param name="source">The source <see cref="IObservable{IChangeSet{TObject}}"/> to add keys to, converting to a cache changeset.</param>

Check warning on line 31 in src/DynamicData/List/ObservableListEx.AddKey.cs

View workflow job for this annotation

GitHub Actions / build

Type parameter declaration must be an identifier not a type. See also error CS0081.

Check warning on line 31 in src/DynamicData/List/ObservableListEx.AddKey.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has syntactically incorrect cref attribute 'IObservable{IChangeSet{TObject}}'
/// <param name="keySelector">A <see cref="Func{T, TResult}"/> function to extract a unique key from each item.</param>
/// <returns>A cache <see cref="IObservable{IChangeSet{TObject, TKey}}"/> changeset stream with keyed items.</returns>

Check warning on line 33 in src/DynamicData/List/ObservableListEx.AddKey.cs

View workflow job for this annotation

GitHub Actions / build

Type parameter declaration must be an identifier not a type. See also error CS0081.

Check warning on line 33 in src/DynamicData/List/ObservableListEx.AddKey.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has syntactically incorrect cref attribute 'IObservable{IChangeSet{TObject, TKey}}'
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is <see langword="null"/>.</exception>
/// <remarks>
/// <para>
/// All index information is dropped during conversion because cache changesets are unordered by default.
/// Use this when you need to transition from list-based pipelines to cache-based operators (Filter by key, Join, Group, etc.).
/// </para>
/// </remarks>
/// <seealso cref="ObservableCacheEx.RemoveKey{TObject, TKey}(IObservable{IChangeSet{TObject, TKey}})"/>
public static IObservable<IChangeSet<TObject, TKey>> AddKey<TObject, TKey>(this IObservable<IChangeSet<TObject>> source, Func<TObject, TKey> keySelector)
where TObject : notnull
where TKey : notnull
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));
keySelector.ThrowArgumentNullExceptionIfNull(nameof(keySelector));

return source.Select(changes => new ChangeSet<TObject, TKey>(new AddKeyEnumerator<TObject, TKey>(changes, keySelector)));
}
}
156 changes: 156 additions & 0 deletions src/DynamicData/List/ObservableListEx.And.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Collections.ObjectModel;
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
using System.Linq.Expressions;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Binding;
using DynamicData.Cache.Internal;
using DynamicData.List.Internal;
using DynamicData.List.Linq;

// ReSharper disable once CheckNamespace
namespace DynamicData;

/// <summary>
/// Extensions for ObservableList.
/// </summary>
public static partial class ObservableListEx
{
/// <summary>
/// Applies a logical AND (intersection) between multiple list changeset streams.
/// Only items present in ALL sources appear in the result.
/// </summary>
/// <typeparam name="T">The type of items in the lists.</typeparam>
/// <param name="source">The first source <see cref="IObservable{IChangeSet{T}}"/> to intersect.</param>
/// <param name="others">The additional <see cref="IObservable{IChangeSet{T}}"/> changeset streams to intersect with.</param>
/// <returns>A list changeset stream containing items that exist in every source.</returns>
/// <exception cref="ArgumentNullException"><paramref name="others"/> is <see langword="null"/>.</exception>
/// <remarks>
/// <para>
/// Uses reference counting per item across all sources. An item appears downstream only when
/// its reference count is non-zero in ALL sources. Item identity is determined by the default equality comparer.
/// </para>
/// <list type="table">
/// <listheader><term>Event</term><description>Behavior</description></listheader>
/// <item><term>Add/AddRange</term><description>The item's reference count is incremented in its source tracker. If the item is now present in all sources, an <b>Add</b> is emitted.</description></item>
/// <item><term>Replace</term><description>The old item's reference count is decremented and the new item's is incremented. Depending on whether each is present in ALL sources, this emits an <b>Add</b>, <b>Remove</b>, <b>Replace</b>, or nothing.</description></item>
/// <item><term>Remove/RemoveRange/Clear</term><description>The item's reference count is decremented. If it was in the result and is no longer in all sources, a <b>Remove</b> is emitted.</description></item>
/// <item><term>Refresh</term><description>Forwarded as <b>Refresh</b> if the item is currently in the result.</description></item>
/// <item><term>Moved</term><description>Ignored (set operations are position-independent).</description></item>
/// </list>
/// <para><b>Worth noting:</b> Item identity uses object equality, not position. Duplicate items in a single source are reference-counted independently.</para>
/// </remarks>
/// <seealso cref="Or{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <seealso cref="Except{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <seealso cref="Xor{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <seealso cref="ObservableCacheEx.And{TObject, TKey}(IObservable{IChangeSet{TObject, TKey}}, IObservable{IChangeSet{TObject, TKey}}[])"/>
public static IObservable<IChangeSet<T>> And<T>(this IObservable<IChangeSet<T>> source, params IObservable<IChangeSet<T>>[] others)
where T : notnull
{
others.ThrowArgumentNullExceptionIfNull(nameof(others));

return source.Combine(CombineOperator.And, others);
}

/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <param name="sources">A <see cref="ICollection{T}"/> of changeset streams to intersect.</param>
/// <remarks>
/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <para>This overload accepts a pre-built collection of sources instead of a params array.</para>
/// </remarks>
public static IObservable<IChangeSet<T>> And<T>(this ICollection<IObservable<IChangeSet<T>>> sources)
where T : notnull => sources.Combine(CombineOperator.And);

/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <param name="sources">An <see cref="IObservableList{T}"/> of changeset streams. Sources can be added or removed dynamically.</param>
/// <remarks>
/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <para>This overload supports dynamic source management: adding or removing changeset streams from the observable list triggers re-evaluation.</para>
/// </remarks>
public static IObservable<IChangeSet<T>> And<T>(this IObservableList<IObservable<IChangeSet<T>>> sources)
where T : notnull => sources.Combine(CombineOperator.And);

/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <param name="sources">An <see cref="IObservableList{IObservableList{T}}"/> of <see cref="IObservableList{IObservableList{T}}"/>. Each inner list's changes are connected automatically.</param>
/// <remarks>
/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <para>This overload accepts <see cref="IObservableList{T}"/> instances directly, calling <c>Connect()</c> internally.</para>
/// </remarks>
public static IObservable<IChangeSet<T>> And<T>(this IObservableList<IObservableList<T>> sources)
where T : notnull => sources.Combine(CombineOperator.And);

/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <param name="sources">An <see cref="IObservableList{ISourceList{T}}"/> of <see cref="ISourceList{T}"/>. Each inner list's changes are connected automatically.</param>
/// <remarks>
/// <inheritdoc cref="And{T}(IObservable{IChangeSet{T}}, IObservable{IChangeSet{T}}[])"/>
/// <para>This overload accepts <see cref="ISourceList{T}"/> instances directly, calling <c>Connect()</c> internally.</para>
/// </remarks>
public static IObservable<IChangeSet<T>> And<T>(this IObservableList<ISourceList<T>> sources)
where T : notnull => sources.Combine(CombineOperator.And);

private static IObservable<IChangeSet<T>> Combine<T>(this ICollection<IObservable<IChangeSet<T>>> sources, CombineOperator type)
where T : notnull
{
sources.ThrowArgumentNullExceptionIfNull(nameof(sources));

return new Combiner<T>(sources, type).Run();
}

private static IObservable<IChangeSet<T>> Combine<T>(this IObservable<IChangeSet<T>> source, CombineOperator type, params IObservable<IChangeSet<T>>[] others)
where T : notnull
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));
others.ThrowArgumentNullExceptionIfNull(nameof(others));

if (others.Length == 0)
{
throw new ArgumentException("Must be at least one item to combine with", nameof(others));
}

var items = source.EnumerateOne().Union(others).ToList();
return new Combiner<T>(items, type).Run();
}

private static IObservable<IChangeSet<T>> Combine<T>(this IObservableList<ISourceList<T>> sources, CombineOperator type)
where T : notnull
{
sources.ThrowArgumentNullExceptionIfNull(nameof(sources));

return Observable.Create<IChangeSet<T>>(
observer =>
{
var changesSetList = sources.Connect().Transform(s => s.Connect()).AsObservableList();
var subscriber = changesSetList.Combine(type).SubscribeSafe(observer);
return new CompositeDisposable(changesSetList, subscriber);
});
}

private static IObservable<IChangeSet<T>> Combine<T>(this IObservableList<IObservableList<T>> sources, CombineOperator type)
where T : notnull
{
sources.ThrowArgumentNullExceptionIfNull(nameof(sources));

return Observable.Create<IChangeSet<T>>(
observer =>
{
var changesSetList = sources.Connect().Transform(s => s.Connect()).AsObservableList();
var subscriber = changesSetList.Combine(type).SubscribeSafe(observer);
return new CompositeDisposable(changesSetList, subscriber);
});
}

private static IObservable<IChangeSet<T>> Combine<T>(this IObservableList<IObservable<IChangeSet<T>>> sources, CombineOperator type)
where T : notnull
{
sources.ThrowArgumentNullExceptionIfNull(nameof(sources));

return new DynamicCombiner<T>(sources, type).Run();
}
}
64 changes: 64 additions & 0 deletions src/DynamicData/List/ObservableListEx.AsObservableList.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Collections.ObjectModel;
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
using System.Linq.Expressions;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Binding;
using DynamicData.Cache.Internal;
using DynamicData.List.Internal;
using DynamicData.List.Linq;

// ReSharper disable once CheckNamespace
namespace DynamicData;

/// <summary>
/// Extensions for ObservableList.
/// </summary>
public static partial class ObservableListEx
{
/// <summary>
/// Wraps a <see cref="ISourceList{T}"/> as a read-only <see cref="IObservableList{T}"/>, hiding mutation methods.
/// </summary>
/// <typeparam name="T">The type of items in the list.</typeparam>
/// <param name="source">The <see cref="ISourceList{T}"/> mutable source list to wrap.</param>
/// <returns>A read-only observable list that mirrors the source.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is <see langword="null"/>.</exception>
public static IObservableList<T> AsObservableList<T>(this ISourceList<T> source)
where T : notnull
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));

return new AnonymousObservableList<T>(source);
}

/// <summary>
/// Materializes a changeset stream into a read-only <see cref="IObservableList{T}"/>.
/// The list is kept in sync with the source stream for the lifetime of the subscription.
/// </summary>
/// <typeparam name="T">The type of items in the list.</typeparam>
/// <param name="source">The source <see cref="IObservable{IChangeSet{T}}"/> to materialize into a read-only list.</param>
/// <returns>A read-only observable list reflecting the current state of the stream.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is <see langword="null"/>.</exception>
/// <remarks>
/// <para>
/// This is the primary way to <b>multicast</b> a changeset pipeline. Materializing once into an <see cref="IObservableList{T}"/>,
/// then calling <c>Connect()</c> on the result for each downstream consumer, ensures the upstream operators are evaluated only once
/// regardless of how many subscribers consume the result.
/// </para>
/// </remarks>
/// <seealso cref="AsObservableList{T}(ISourceList{T})"/>
public static IObservableList<T> AsObservableList<T>(this IObservable<IChangeSet<T>> source)
where T : notnull
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));

return new AnonymousObservableList<T>(source);
}
}
Loading
Loading