Tuesday, September 22, 2009

ThreadSafeDictionary

Threading is a powerful tool today, since almost every client computer has two or more CPUs. On the other side Dictionary objects are often used to store important information which are need to provide and access information for different proposes. Unfortunately, since today (.NET 3.5) .NET does not offer a build in thread safe implementation of a dictionary. The System.Collections.HashTable provides a static Synchronized() method. Nevertheless, this method includes two problems. On one side, the HashTable is a not strong typed implementation, so it always becomes risky if you use a HashTable for more than a small scope. On the other side the MSDN says that it only supports multiple writing threads, but it's not thread safe for multi-threaded reading access.

Here you can find a complete implementation of a thread safe dictionary which might be helpful for your proposes. It implements the following interfaces:

  • System.Collections.IEnumerableSystem.Collections.IDictionary
  • System.Collections.Generic.IDictionary System.Collections.Generic.ICollection>
  • System.Collections.Generic.IEnumerable>
In addition to a thread safe read/write implementation of a usual dictionary it provides three special methods:

TryAdd(TKey, TValue)
Works like "TryGetValue". It tries to add a specified key and value to the dictionary if it is not yet present; otherwise it does not change the existing items.


Lock()
Which generates an exclusive thread lock for the current thread.


Unlock()
Which releases an exclusive lock for the current thread.


A sample how to use the ThreadSafeDictionary in a multi-threaded scenario can be found at the end of this blog.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;

namespace FR.Collections.Generic
{
   /// <summary>
   /// Represents a thread safe version of a 
   /// <see cref="IDictionary<TKey,TValue>"/>.
   /// </summary>
   /// <typeparam name="TKey">The key to be used for items stored within the 
   /// dictionary.</typeparam>
   /// <typeparam name="TValue">The value to be handled within the 
   /// dictionary for a specified key.</typeparam>
   [DebuggerDisplay("Count = {Count}")]
   public class ThreadSafeDictionary<TKey, TValue>
   :
   IDictionary<TKey, TValue>,
   ICollection<KeyValuePair<TKey, TValue>>,
   IEnumerable<KeyValuePair<TKey, TValue>>,
   IEnumerable,
   IDictionary
   {
      #region Constructor

      /// <summary>
      /// Creates a new instance of a 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/> with 
      /// default settings
      /// </summary>
      public ThreadSafeDictionary()
      {
         _dict = new Dictionary<TKey, TValue>();
         _sync = new object();
      }

      /// <summary>
      /// Creates a new instance of a 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/> 
      /// and adds all items from another dictionary.
      /// </summary>
      /// <param name="dictionary">
      /// The <see cref="IDictionary<TKey,TValue>"/> to 
      /// fill into this instance of a thread safe dictionary.
      /// </param>
      public ThreadSafeDictionary(IDictionary<TKey, TValue> dictionary)
      {
         _dict = new Dictionary<TKey, TValue>(dictionary);
         _sync = new object();
      }

      /// <summary>
      /// Creates a new instance of a thread safe dictionary which uses a 
      /// specified comparer to identify the uniquenes of added keys.
      /// </summary>
      /// <param name="comparer">The 
      /// <see cref="IEqualityComparer<T>"/> to use to determine the 
      /// uniqueness of hold keys.</param>
      public ThreadSafeDictionary(IEqualityComparer<TKey> comparer)
      {
         _dict = new Dictionary<TKey, TValue>(comparer);
         _sync = new object();
      }

      /// <summary>
      /// Creates a new instance of an empty 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/>
      /// with a specified initial capacity.
      /// </summary>
      /// <param name="initialCapacity">The initial capacity for the new 
      /// instance of a <see cref="ThreadSafeDictionary<TKey,TValue>"/>.
      /// </param>
      public ThreadSafeDictionary(int initialCapacity)
      {
         _dict = new Dictionary<TKey, TValue>(initialCapacity);
         _sync = new object();
      }

      /// <summary>
      /// Creates a new instance of a 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/> 
      /// with initially contains the items of another specified 
      /// <see cref="IDictionary<TKey, TValue>"/> and uses a 
      /// specified comparer to determine the uniqueness of the 
      /// items within the dictionary.
      /// </summary>
      /// <param name="dictionary">
      /// The <see cref="IDictionary<TKey,TValue>"/> to 
      /// copy into this new instance of a 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/>.
      /// </param>
      /// <param name="comparer">
      /// The <see cref="IEqualityComparer<T>"/> to use 
      /// to compare the keys within this dictionary.
      /// </param>
      public ThreadSafeDictionary(IDictionary<TKey, TValue> dictionary, IEqualityComparer<TKey> comparer)
      {
         _dict = new Dictionary<TKey, TValue>(dictionary, comparer);
         _sync = new object();
      }

      /// <summary>
      /// Creates a new instance of a 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/> 
      /// with a initial capacity of items to hold and a specified 
      /// <see cref="IEqualityComparer<T>"/> to compare keys.
      /// </summary>
      /// <param name="initialCapacity">
      /// The initial capacity for items to hold within this dictionary.
      /// </param>
      /// <param name="comparer">
      /// The <see cref="IEqualityComparer<T>"/> 
      /// to use to compare to compare keys.
      /// </param>
      public ThreadSafeDictionary(int initialCapacity, IEqualityComparer<TKey> comparer)
      {
         _dict = new Dictionary<TKey, TValue>(initialCapacity, comparer);
         _sync = new object();
      }

      #endregion

      #region Private Fields

      private Dictionary<TKey, TValue> _dict;
      private object _sync;

      #endregion

      #region Public Indexers

      /// <summary>
      /// Gets or sets the value associated with the specified key.
      /// </summary>
      /// <param name="key">
      /// The key of the value to get or set.
      /// </param>
      /// <returns>
      /// The value associated with the specified key. If the 
      /// specified key is not found, a get operation throws 
      /// a <see cref="KeyNotFoundException"/>.
      /// </returns>
      public TValue this[TKey key]
      {
         get
         {
            lock (_sync)
            {
               return _dict[key];
            }
         }
         set
         {
            lock (_sync)
            {
               _dict[key] = value;
            }
         }
      }

      #endregion

      #region Public Properties

      /// <summary>
      /// Gets the number of key/value pairs contained in the 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/>.
      /// </summary>
      public int Count
      {
         get
         {
            lock (_sync)
            {
               return _dict.Count;
            }
         }
      }

      /// <summary>
      /// Gets a collection containing the keys in the 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/>.
      /// </summary>
      public ICollection<TKey> Keys
      {
         get
         {
            lock (_sync)
            {
               return _dict.Keys;
            }
         }
      }

      /// <summary>
      /// Gets a collection containing the values in the 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/>.
      /// </summary>
      public ICollection<TValue> Values
      {
         get
         {
            lock (_sync)
            {
               return _dict.Values;
            }
         }
      }

      #endregion

      #region Public Methods

      /// <summary>
      /// Adds a specified key and value to the dictionary.
      /// </summary>
      /// <param name="key">
      /// The key of the new item to add to this dictionary.
      /// </param>
      /// <param name="value">
      /// The value to be associated with the specified key.
      /// </param>
      public void Add(TKey key, TValue value)
      {
         lock (_sync)
         {
            _dict.Add(key, value);
         }
      }

      /// <summary>
      /// Removes all keys and values from this instance of a 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/>.
      /// </summary>
      public void Clear()
      {
         lock (_sync)
         {
            _dict.Clear();
         }
      }

      /// <summary>
      /// Determines if a specified key is currently present within this 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/>.
      /// </summary>
      /// <param name="key">
      /// The key to find within this dictionary.
      /// </param>
      /// <returns>
      /// True if the specified key is actually presend within the 
      /// dictionary; otherwise returns false.
      /// </returns>
      public bool ContainsKey(TKey key)
      {
         lock (_sync)
         {
            return _dict.ContainsKey(key);
         }
      }

      /// <summary>
      /// Copies all <see cref="KeyValuePair<TKey,TValue>"/> 
      /// which are currently present within this instance of a 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/> 
      /// to a specified array.
      /// </summary>
      /// <param name="array">
      /// The one-dimensional array to copy the current items of 
      /// this dictionary into.
      /// </param>
      /// <param name="arrayIndex">
      /// The zero-based start index within the destination array
      /// to start copying.
      /// </param>
      public void CopyTo(KeyValuePair<TKey, TValue>[] array, int arrayIndex)
      {
         lock (_sync)
         {
            ((IDictionary<TKey, TValue>)_dict).CopyTo(array, arrayIndex);
         }
      }

      /// <summary>
      /// Returns an <see cref="IEnumerator<T>"/> to iterate
      /// through all items within this dictionary.
      /// </summary>
      /// <returns>
      /// An <see cref="IEnumerator<T>"/> to iterate through 
      /// all items with this dictionary.
      /// </returns>
      public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
      {
         lock (_sync)
         {
            return _dict.GetEnumerator();
         }
      }

      /// <summary>
      /// Generates an exclusive thread lock for the calling thread. 
      /// As long as the 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/> 
      /// is locked by this method, no other thread can read or 
      /// write data into or from the dictionary.
      /// </summary>
      public void Lock()
      {
         Monitor.Enter(_sync);
      }

      /// <summary>
      /// Removes an item for a specified key from this instance of a 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/>.
      /// </summary>
      /// <param name="key">
      /// The key of the item to remove.
      /// </param>
      /// <returns>
      /// True if the specified key was currently presend and 
      /// successfully removed from the 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/>; 
      /// otherwise false.</returns>
      public bool Remove(TKey key)
      {
         lock (_sync)
         {
            return _dict.Remove(key);
         }
      }

      /// <summary>
      /// Tries to add a new item to the 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/>. 
      /// If the specified key is currently not present the 
      /// new key/value pair will be added; otherwise nothing happens.
      /// </summary>
      /// <param name="key">
      /// The new key to be added to the current instance of a 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/>.
      /// </param>
      /// <param name="value">
      /// The new value to be added for the specified key.
      /// </param>
      /// <returns>
      /// True if the new key/value was added to the dictionary; 
      /// otherwise false.
      /// </returns>
      public bool TryAdd(TKey key, TValue value)
      {
         lock (_sync)
         {
            if (_dict.ContainsKey(key))
               return false;
            _dict.Add(key, value);
            return true;
         }
      }

      /// <summary>
      /// Tries to get a value for a specified key from this instance 
      /// of a <see cref="ThreadSafeDictionary<TKey,TValue>"/> 
      /// into a specified out parameter.
      /// </summary>
      /// <param name="key">
      /// The key to get an item for.
      /// </param>
      /// <param name="value">
      /// If the specified key was found within this 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/> 
      /// the value will return the value for the key; otherwise 
      /// value will be set to the default of the type of the value.
      /// </param>
      /// <returns>
      /// True if the specified key was found within the dictionary; 
      /// otherwise false.
      /// </returns>
      public bool TryGetValue(TKey key, out TValue value)
      {
         lock (_sync)
         {
            return _dict.TryGetValue(key, out value);
         }
      }

      /// <summary>
      /// Releases a exclusive lock for this 
      /// <see cref="ThreadSafeDictionary<TKey,TValue>"/>
      /// generated by calling the <see cref="Lock"/> method.
      /// </summary>
      public void Unlock()
      {
         Monitor.Exit(_sync);
      }

      #endregion

      #region IDictionary<TKey,TValue> Members

      void IDictionary<TKey, TValue>.Add(TKey key, TValue value)
      {
         this.Add(key, value);
      }

      bool IDictionary<TKey, TValue>.ContainsKey(TKey key)
      {
         return this.ContainsKey(key);
      }

      ICollection<TKey> IDictionary<TKey, TValue>.Keys
      {
         get { return this.Keys; }
      }

      bool IDictionary<TKey, TValue>.Remove(TKey key)
      {
         return this.Remove(key);
      }

      bool IDictionary<TKey, TValue>.TryGetValue(TKey key, out TValue value)
      {
         return this.TryGetValue(key, out value);
      }

      ICollection<TValue> IDictionary<TKey, TValue>.Values
      {
         get { return this.Values; }
      }

      TValue IDictionary<TKey, TValue>.this[TKey key]
      {
         get { return this[key]; }
         set { this[key] = value; }
      }

      #endregion

      #region ICollection<KeyValuePair<TKey,TValue>> Members

      void ICollection<KeyValuePair<TKey, TValue>>.Add(KeyValuePair<TKey, TValue> item)
      {
         this.Add(item.Key, item.Value);
      }

      void ICollection<KeyValuePair<TKey, TValue>>.Clear()
      {
         this.Clear();
      }

      bool ICollection<KeyValuePair<TKey, TValue>>.Contains(KeyValuePair<TKey, TValue> item)
      {
         TValue value;
         if (this.TryGetValue(item.Key, out value) && object.ReferenceEquals(value, item.Value))
            return true;
         return false;
      }

      void ICollection<KeyValuePair<TKey, TValue>>.CopyTo(KeyValuePair<TKey, TValue>[] array, int arrayIndex)
      {
         this.CopyTo(array, arrayIndex);
      }

      int ICollection<KeyValuePair<TKey, TValue>>.Count
      {
         get { return this.Count; }
      }

      bool ICollection<KeyValuePair<TKey, TValue>>.IsReadOnly
      {
         get { return false; }
      }

      bool ICollection<KeyValuePair<TKey, TValue>>.Remove(KeyValuePair<TKey, TValue> item)
      {
         return this.Remove(item.Key);
      }

      #endregion

      #region IEnumerable<KeyValuePair<TKey,TValue>> Members

      IEnumerator<KeyValuePair<TKey, TValue>> IEnumerable<KeyValuePair<TKey, TValue>>.GetEnumerator()
      {
         return this.GetEnumerator();
      }

      #endregion

      #region IEnumerable Members

      IEnumerator IEnumerable.GetEnumerator()
      {
         return this.GetEnumerator();
      }

      #endregion

      #region IDictionary Members

      void IDictionary.Add(object key, object value)
      {
         this.Add((TKey)key, (TValue)value);
      }

      void IDictionary.Clear()
      {
         this.Clear();
      }

      bool IDictionary.Contains(object key)
      {
         return this.ContainsKey((TKey)key);
      }

      IDictionaryEnumerator IDictionary.GetEnumerator()
      {
         return (IDictionaryEnumerator)GetEnumerator();
      }

      bool IDictionary.IsFixedSize
      {
         get { return false; }
      }

      bool IDictionary.IsReadOnly
      {
         get { return false; }
      }

      ICollection IDictionary.Keys
      {
         get
         {
            lock (_sync)
            {
               return new Dictionary<TKey, TValue>.KeyCollection(_dict);
            }
         }
      }

      void IDictionary.Remove(object key)
      {
         Remove((TKey)key);
      }

      ICollection IDictionary.Values
      {
         get
         {
            lock (_sync)
            {
               return new Dictionary<TKey, TValue>.ValueCollection(_dict);
            }
         }
      }

      object IDictionary.this[object key]
      {
         get { return this[(TKey)key]; }
         set { this[(TKey)key] = (TValue)value; }
      }

      #endregion

      #region ICollection Members

      void ICollection.CopyTo(Array array, int index)
      {
         lock (_sync)
         {
            ((ICollection)_dict).CopyTo(array, index);
         }
      }

      int ICollection.Count
      {
         get { return this.Count; }
      }

      bool ICollection.IsSynchronized
      {
         get { return true; }
      }

      object ICollection.SyncRoot
      {
         get { return _sync; }
      }

      #endregion
   }
}


As you can see in the following example, the dictionary can be used in a multi-threading environment just like any other dictionary in a single-threaded scenario.

using System;
using System.Collections.Generic;
using System.Threading;

namespace ConsoleApplication1
{
  class Program
  {
    // the count of items to be added by each thread
    private static int _itemCount;
    // the count of concurrent working threads
    private static int _threadCount;
    // one ThreadSafeDictionary wich will be used by all threads
    private static ThreadSafeDictionary<int, int> _dict;


    static void Main(string[] args)
    {
      // configure parameters
      _itemCount = 100000;
      _dict = new ThreadSafeDictionary<int, int>(_itemCount);
      _threadCount = 100;

      // track the working threads by a AutoResetEvent 
      // instead of old-school Thread.Sleep()
      List<AutoResetEvent> events = new List<AutoResetEvent>(_threadCount);

      for (int i = 0; i < _threadCount; i++)
      {
        // execution information
        int id = i;
        AutoResetEvent autoEvent = new AutoResetEvent(false);
        WaitCallback callback = new WaitCallback(DoAsyncWork);
        // remember the AutoResetEvent wich will be called by the 
        // worker thread when done
        events.Add(autoEvent);

        // create the worker thread using the ThreadPool
        ThreadPool.QueueUserWorkItem(
        callback,
        new object[] { id, autoEvent }
        );
      }

      // wait since all threads are finished
      foreach (var autoEvent in events)
        autoEvent.WaitOne();

      Console.WriteLine();
      Console.WriteLine("Finished");
      Console.ReadKey();
    }

    static void DoAsyncWork(object obj)
    {
      // get arguments from input parameter
      object[] args = obj as object[];
      int id = (int)args[0];
      // each threads starts to work with its own id range
      int start = id * _itemCount;
      AutoResetEvent autoEvent = (AutoResetEvent)args[1];

      Console.WriteLine("Enter {0}", id);

      // use the ThreadSafeDictionary in a multi-threaded
      // environment like each other dictionary

      // add items
      int end = _itemCount - 100;
      for (int i = 0; i < end; i++)
      {
        _dict.Add(i + start, i);
      }

      for (int i = 0; i < end; i++)
      {
        int j = _dict[i + start];
      }

      // set an exclusive lock. All other threads will wait
      _dict.Lock();
      start = _itemCount - 100;
      Console.WriteLine("-> Lock from {0}", id);
      for (int i = _itemCount - 100; i < _itemCount; i++)
      {
        _dict.Add(i + start, i);
      }
      Console.WriteLine("<- Unlock from {0}", id);
      _dict.Unlock();

      Console.WriteLine("Exit {0}", id);
      // inform caller that work is done
      autoEvent.Set();
    }
  }
}


Hope this is class will be helpful for some of you guys and gals! :-)

Flo

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.