programing tip

차단 대기열 만들기

itbloger 2020. 6. 2. 08:26
반응형

차단 대기열 만들기 .NET에서?


대기열에 여러 스레드를 추가하고 동일한 대기열에서 여러 스레드를 읽는 시나리오가 있습니다. 대기열이 특정 크기에 도달 하면 대기열에서 항목을 제거 할 때까지 대기열을 채우는 모든 스레드 가 추가시 차단됩니다.

아래 해결책은 현재 사용중인 것이며 내 질문은 : 어떻게 개선 할 수 있습니까? 사용해야하는 BCL에서이 동작을 이미 활성화 한 개체가 있습니까?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}

그것은 매우 안전하지 않은 것처럼 보입니다 (매우 작은 동기화). 어떻습니까 :

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(편집하다)

실제로, 독자는 부울 플래그와 같이 독자가 깨끗하게 종료하기 시작할 수 있도록 큐를 닫는 방법을 원할 것입니다.

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}

.net 4 BlockingCollection을 사용하여 Add ()를 대기열에 넣고 Take ()를 대기열에 넣습니다. 내부적으로 비 차단 ConcurrentQueue를 사용합니다. 자세한 내용은 여기를 클릭하십시오. 최고 및 빠른 생산자 / 소비자 대기열 기술 BlockingCollection vs 동시 대기열


"어떻게 개선 할 수 있습니까?"

글쎄, 클래스의 모든 메소드를 살펴보고 다른 스레드가 해당 메소드 또는 다른 메소드를 동시에 호출하면 어떻게 될지 고려해야합니다. 예를 들어, Add 메서드가 아닌 Remove 메서드에는 잠금을 설정합니다. 한 스레드가 다른 스레드 제거와 동시에 추가되면 어떻게됩니까? 나쁜 것들.

또한 메소드가 첫 번째 오브젝트의 내부 데이터 (예 : GetEnumerator)에 대한 액세스를 제공하는 두 번째 오브젝트를 리턴 할 수 있음을 고려하십시오. 한 스레드가 해당 열거자를 통과하고 다른 스레드가 목록을 동시에 수정한다고 가정하십시오. 안좋다.

경험상 가장 좋은 방법은 클래스의 메소드 수를 절대 최소값으로 줄여서 더 간단하게 만드는 것입니다.

특히 다른 컨테이너 클래스를 상속하지 마십시오. 클래스의 모든 메소드를 노출시켜 호출자가 내부 데이터를 손상 시키거나 데이터가 부분적으로 완전히 변경된 것을 볼 수있는 방법을 제공하기 때문에 해당 시점에 손상된 것으로 표시됨). 모든 세부 사항을 숨기고 액세스 방법을 완전히 무자비하십시오.

기성품 솔루션을 사용하는 것이 좋습니다. 스레딩에 대한 책을 얻거나 타사 라이브러리를 사용하십시오. 그렇지 않으면 시도중인 것을 감안할 때 오랫동안 코드를 디버깅 할 것입니다.

또한 Remove가 호출자가 특정 항목을 선택하는 대신 항목 (예 : 대기열이므로 처음 추가 된 항목)을 반환하는 것이 더 의미가 없습니까? 대기열이 비어 있으면 Remove도 차단해야합니다.

업데이트 : Marc의 답변은 실제로 이러한 모든 제안을 구현합니다! :) 그러나 그의 버전이 왜 그렇게 개선되었는지 이해하는 것이 도움이 될 수 있으므로 여기에 남겨 두겠습니다.


System.Collections.Concurrent 네임 스페이스에서 BlockingCollectionConcurrentQueue사용할 수 있습니다.

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
    /// <summary>
    /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
    /// </summary>
    public ProducerConsumerQueue()  
        : base(new ConcurrentQueue<T>())
    {
    }

  /// <summary>
  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
  /// </summary>
  /// <param name="maxSize"></param>
    public ProducerConsumerQueue(int maxSize)
        : base(new ConcurrentQueue<T>(), maxSize)
    {
    }



}

I just knocked this up using the Reactive Extensions and remembered this question:

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}

Not necessarily entirely safe, but very simple.


This is what I came op for a thread safe bounded blocking queue.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}

I haven't fully explored the TPL but they might have something that fits your needs, or at the very least, some Reflector fodder to snag some inspiration from.

Hope that helps.


Well, you might look at System.Threading.Semaphore class. Other than that - no, you have to make this yourself. AFAIK there is no such built-in collection.


If you want maximum throughput, allowing multiple readers to read and only one writer to write, BCL has something called ReaderWriterLockSlim that should help slim down your code...

참고URL : https://stackoverflow.com/questions/530211/creating-a-blocking-queuet-in-net

반응형