Подтвердить что ты не робот

Как я могу дублировать тип объединенного типа F # в С#?

Я создал новый класс Actor, который обрабатывает переданные ему сообщения. Проблема, с которой я сталкиваюсь, - это выяснить, что является самым элегантным способом передачи связанных, но разных сообщений Актеру. Моя первая идея - использовать наследование, но кажется настолько раздутой, но это сильно типы, которые являются определенным требованием.

Есть идеи?

Пример

private abstract class QueueMessage { }

private class ClearMessage : QueueMessage 
{
    public static readonly ClearMessage Instance = new ClearMessage();

    private ClearMessage() { }
}

private class TryDequeueMessage : QueueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();

    private TryDequeueMessage() { }
}

private class EnqueueMessage : QueueMessage 
{
    public TValue Item { get; private set; }

    private EnqueueMessage(TValue item)
    {
        Item = item;
    }
}

Класс актера

/// <summary>Represents a callback method to be executed by an Actor.</summary>
/// <typeparam name="TReply">The type of reply.</typeparam>
/// <param name="reply">The reply made by the actor.</param>
public delegate void ActorReplyCallback<TReply>(TReply reply);

/// <summary>Represents an Actor which receives and processes messages in concurrent applications.</summary>
/// <typeparam name="TMessage">The type of message this actor accepts.</typeparam>
/// <typeparam name="TReply">The type of reply made by this actor.</typeparam>
public abstract class Actor<TMessage, TReply> : IDisposable
{
    /// <summary>The default total number of threads to process messages.</summary>
    private const Int32 DefaultThreadCount = 1;


    /// <summary>Used to serialize access to the message queue.</summary>
    private readonly Locker Locker;

    /// <summary>Stores the messages until they can be processed.</summary>
    private readonly System.Collections.Generic.Queue<Message> MessageQueue;

    /// <summary>Signals the actor thread to process a new message.</summary>
    private readonly ManualResetEvent PostEvent;

    /// <summary>This tells the actor thread to stop reading from the queue.</summary>
    private readonly ManualResetEvent DisposeEvent;

    /// <summary>Processes the messages posted to the actor.</summary>
    private readonly List<Thread> ActorThreads;


    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    public Actor() : this(DefaultThreadCount) { }

    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    /// <param name="thread_count"></param>
    public Actor(Int32 thread_count)
    {
        if (thread_count < 1) throw new ArgumentOutOfRangeException("thread_count", thread_count, "Must be 1 or greater.");

        Locker = new Locker();
        MessageQueue = new System.Collections.Generic.Queue<Message>();
        EnqueueEvent = new ManualResetEvent(true);
        PostEvent = new ManualResetEvent(false);
        DisposeEvent = new ManualResetEvent(true);
        ActorThreads = new List<Thread>();

        for (Int32 i = 0; i < thread_count; i++)
        {
            var thread = new Thread(ProcessMessages);
            thread.IsBackground = true;
            thread.Start();
            ActorThreads.Add(thread);
        }
    }


    /// <summary>Posts a message and waits for the reply.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <returns>The reply from the actor.</returns>
    public TReply PostWithReply(TMessage message)
    {
        using (var wrapper = new Message(message))
        {
            lock (Locker) MessageQueue.Enqueue(wrapper);
            PostEvent.Set();
            wrapper.Channel.CompleteEvent.WaitOne();
            return wrapper.Channel.Value;
        }
    }

    /// <summary>Posts a message to the actor and executes the callback when the reply is received.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <param name="callback">The callback that will be invoked once the replay is received.</param>
    public void PostWithAsyncReply(TMessage value, ActorReplyCallback<TReply> callback)
    {
        if (callback == null) throw new ArgumentNullException("callback");
        ThreadPool.QueueUserWorkItem(state => callback(PostWithReply(value)));
    }

    /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
    public void Dispose()
    {
        if (DisposeEvent.WaitOne(10))
        {
            DisposeEvent.Reset();
            PostEvent.Set();

            foreach (var thread in ActorThreads)
            {
                thread.Join();
            }

            ((IDisposable)PostEvent).Dispose();
            ((IDisposable)DisposeEvent).Dispose();
        }
    }

    /// <summary>Processes a message posted to the actor.</summary>
    /// <param name="message">The message to be processed.</param>
    protected abstract void ProcessMessage(Message message);

    /// <summary>Dequeues the messages passes them to ProcessMessage.</summary>
    private void ProcessMessages()
    {
        while (PostEvent.WaitOne() && DisposeEvent.WaitOne(10))
        {
            var message = (Message)null;

            while (true)
            {
                lock (Locker)
                {
                    message = MessageQueue.Count > 0 ?
                        MessageQueue.Dequeue() :
                        null;

                    if (message == null)
                    {
                        PostEvent.Reset();
                        break;
                    }
                }

                try
                {
                    ProcessMessage(message);
                }
                catch
                {

                }
            }
        }
    }


    /// <summary>Represents a message that is passed to an actor.</summary>
    protected class Message : IDisposable
    {
        /// <summary>The actual value of this message.</summary>
        public TMessage Value { get; private set; }

        /// <summary>The channel used to give a reply to this message.</summary>
        public Channel Channel { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Message class.</summary>
        /// <param name="value">The actual value of the message.</param>
        public Message(TMessage value)
        {
            Value = value;
            Channel = new Channel();
        }


        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            Channel.Dispose();
        }
    }

    /// <summary>Represents a channel used by an actor to reply to a message.</summary>         
    protected class Channel : IDisposable
    {
        /// <summary>The value of the reply.</summary>
        public TReply Value { get; private set; }

        /// <summary>Signifies that the message has been replied to.</summary>
        public ManualResetEvent CompleteEvent { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Channel class.</summary>
        public Channel()
        {
            CompleteEvent = new ManualResetEvent(false);
        }

        /// <summary>Reply to the message received.</summary>
        /// <param name="value">The value of the reply.</param>
        public void Reply(TReply value)
        {
            Value = value;
            CompleteEvent.Set();
        }

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            ((IDisposable)CompleteEvent).Dispose();
        }
    }
}
4b9b3361

Ответ 1

В вашем примере кода вы реализуете PostWithAsyncReply в терминах PostWithReply. Это не идеально, потому что это означает, что когда вы вызываете PostWithAsyncReply, и актеру требуется некоторое время, чтобы справиться с ним, на самом деле есть два потока: один, исполняющий актер, и тот, кто ждет его завершения. Было бы лучше, если бы один поток выполнял действие, а затем вызывал обратный вызов в асинхронном случае. (Очевидно, что в синхронном случае не избежать связывания двух потоков).

Update:

Подробнее об этом: вы создаете актера с аргументом, сообщающим ему, сколько потоков будет выполняться. Для простоты предположим, что каждый актер работает с одним потоком (на самом деле довольно хорошая ситуация, потому что актеры могут иметь внутреннее состояние без блокировки на нем, поскольку только один поток обращается к нему напрямую).

Актер A вызывает актера B, ожидая ответа. Чтобы обработать запрос, актеру B нужно вызвать актера C. Итак, теперь ждут только потоки A и B, а C - единственный, который фактически дает процессору любую работу. Так много для многопоточности! Но это то, что вы получаете, если вы ждете ответов все время.

Хорошо, вы можете увеличить количество потоков, которые вы запускаете в каждом акторе. Но вы начнете их, чтобы они могли сидеть без дела. Стек использует много памяти, а переключение контекста может быть дорогостоящим.

Итак, лучше отправлять сообщения асинхронно, с механизмом обратного вызова, чтобы вы могли получить результат. Проблема с вашей реализацией заключается в том, что вы захватываете еще один поток из пула потоков, чтобы просто сидеть и ждать. Таким образом, вы в основном применяете обходное решение для увеличения количества потоков. Вы выделяете поток для задачи никогда не запускаться.

Лучше реализовать PostWithReply в терминах PostWithAsyncReply, т.е. наоборот. Асинхронная версия является низкоуровневой. Основываясь на моем примере на основе делегата (потому что это связано с меньшим набором кода!):

private bool InsertCoinImpl(int value) 
{
    // only accept dimes/10p/whatever it is in euros
    return (value == 10);
}

public void InsertCoin(int value, Action<bool> accepted)
{
    Submit(() => accepted(InsertCoinImpl(value)));
}

Таким образом, частная реализация возвращает bool. Открытый асинхронный метод принимает действие, которое получит возвращаемое значение; как частная реализация, так и действие обратного вызова выполняются в одном потоке.

Надеюсь, что ждать синхронного ожидания будет случай меньшинства. Но когда вам это нужно, он может быть предоставлен вспомогательным методом, полностью общим назначением и не привязан к какому-либо конкретному актору или типу сообщения:

public static T Wait<T>(Action<Action<T>> activity)
{
    T result = default(T);
    var finished = new EventWaitHandle(false, EventResetMode.AutoReset);

    activity(r =>
        {
            result = r;
            finished.Set();
        });

    finished.WaitOne();
    return result;
}

Итак, теперь у какого-то другого актера можно сказать:

bool accepted = Helpers.Wait<bool>(r => chocMachine.InsertCoin(5, r));

Аргумент типа Wait может быть ненужным, не пробовал компилировать его. Но Wait в основном магия - обратный вызов для вас, поэтому вы можете передать его на какой-то асинхронный метод, а снаружи вы просто возвращаете все, что было передано обратному вызову в качестве возвращаемого значения. Обратите внимание, что lambda, которую вы передаете в Wait, по-прежнему выполняется в том же потоке, который называется Wait.

Теперь мы возвращаем вас в нашу обычную программу...

Что касается актуальной проблемы, о которой вы просили, вы отправляете сообщение актеру, чтобы заставить его что-то сделать. Здесь полезны делегаты. Они позволяют эффективно компилировать вам класс с некоторыми данными, конструктором, который вам даже не нужно вызывать явно, а также методом. Если вам нужно написать кучу маленьких классов, переключитесь на делегатов.

abstract class Actor
{
    Queue<Action> _messages = new Queue<Action>();

    protected void Submit(Action action)
    {
        // take out a lock of course
        _messages.Enqueue(action);
    }

    // also a "run" that reads and executes the 
    // message delegates on background threads
}

Теперь конкретный производный актер следует этому шаблону:

class ChocolateMachineActor : Actor
{
    private void InsertCoinImpl(int value) 
    {
        // whatever...
    }

    public void InsertCoin(int value)
    {
        Submit(() => InsertCoinImpl(value));
    }
}

Итак, чтобы отправить сообщение актеру, вы просто вызываете общедоступные методы. Частный метод Impl выполняет реальную работу. Нет необходимости писать кучу классов сообщений вручную.

Очевидно, что я отказался от ответа на вопрос, но все это можно сделать с большим количеством параметров. (См. Обновление выше).

Ответ 2

Стив Гилхам обобщил, как компилятор фактически обрабатывает дискриминационные союзы. Для вашего собственного кода вы можете рассмотреть упрощенную версию этого. Учитывая следующее F #:

type QueueMessage<T> = ClearMessage | TryDequeueMessage | EnqueueMessage of T

Здесь один из способов эмулировать его в С#:

public enum MessageType { ClearMessage, TryDequeueMessage, EnqueueMessage }

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract MessageType MessageType { get; }

    /// <summary>
    /// Only applies to EnqueueMessages
    /// </summary>
    public abstract T Item { get; }

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }


    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.ClearMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.TryDequeueMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override MessageType MessageType
        {
            get { return MessageType.EnqueueMessage; }
        }

        /// <summary>
        /// Gets the item to be enqueued
        /// </summary>
        public override T Item { get { return item; } }
    }
}

Теперь, в коде, которому присваивается QueueMessage, вы можете включить свойство MessageType вместо соответствия шаблону и убедиться, что вы получили доступ к свойству Item только на EnqueueMessage s.

ИЗМЕНИТЬ

Вот еще одна альтернатива, основанная на коде Джульетты. Я попытался упорядочить все, чтобы получить более удобный интерфейс с С#. Это предпочтительнее предыдущей версии, поскольку вы не можете получить исключение MethodNotImplemented.

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase);

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }

    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return clearCase();
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return tryDequeueCase();
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return enqueueCase(item);
        }
    }
}

Вы должны использовать этот код следующим образом:

public class MessageUserTest
{
    public void Use()
    {
        // your code to get a message here...
        QueueMessage<string> msg = null; 

        // emulate pattern matching, but without constructor names
        int i =
            msg.Match(
                clearCase:      () => -1,
                tryDequeueCase: () => -2,
                enqueueCase:     s =>  s.Length);
    }
}

Ответ 3

Типы соединений и сопоставление соответствия шаблонов довольно прямо к шаблону посетителя, я опубликовал об этом несколько раз раньше:

Итак, если вы хотите передавать сообщения с большим количеством разных типов, вы застряли в реализации шаблона посетителя.

(Предупреждение, непроверенный код впереди, но должен дать вам представление о его выполнении)

Скажем, у нас есть что-то вроде этого:

type msg =
    | Add of int
    | Sub of int
    | Query of ReplyChannel<int>


let rec counts = function
    | [] -> (0, 0, 0)
    | Add(_)::xs -> let (a, b, c) = counts xs in (a + 1, b, c)
    | Sub(_)::xs -> let (a, b, c) = counts xs in (a, b + 1, c)
    | Query(_)::xs -> let (a, b, c) = counts xs in (a, b, c + 1)

В итоге вы получите этот громоздкий код С#:

interface IMsgVisitor<T>
{
    T Visit(Add msg);
    T Visit(Sub msg);
    T Visit(Query msg);
}

abstract class Msg
{
    public abstract T Accept<T>(IMsgVistor<T> visitor)
}

class Add : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Sub : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Query : Msg
{
    public readonly ReplyChannel<int> Value;
    public Add(ReplyChannel<int> value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

Теперь, когда вы хотите что-то сделать с сообщением, вам нужно реализовать посетителя:

class MsgTypeCounter : IMsgVisitor<MsgTypeCounter>
{
    public readonly Tuple<int, int, int> State;    

    public MsgTypeCounter(Tuple<int, int, int> state) { this.State = state; }

    public MsgTypeCounter Visit(Add msg)
    {
        Console.WriteLine("got Add of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(1 + State.Item1, State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Sub msg)
    {
        Console.WriteLine("got Sub of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Query msg)
    {
        Console.WriteLine("got Query of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }
}

Наконец, вы можете использовать его следующим образом:

var msgs = new Msg[] { new Add(1), new Add(3), new Sub(4), new ReplyChannel(null) };
var counts = msgs.Aggregate(new MsgTypeVisitor(Tuple.Create(0, 0, 0)),
    (acc, x) => x.Accept(acc)).State;

Да, это как бы тупо, как кажется, но то, как вы передаете несколько сообщений классу безопасным образом, а также почему мы не реализуем объединения в С#;)

Ответ 4

Длинный снимок, но в любом случае..

Я предполагаю, что дискриминационный союз - это F # для ADT (абстрактный тип данных). Это означает, что тип может быть одним из нескольких факторов.

Если есть два, вы можете попробовать и поместить его в простой общий класс с двумя параметрами типа:

 public struct DiscriminatedUnion<T1,T2>
 {   
      public DiscriminatedUnion(T1 t1) { value = t1; }
      public DiscriminatedUnion(T2 t1) { value = t2; }


      public static implicit operator T1(DiscriminatedUnion<T1,T2> du) {return (T1)du.value; }
      public static implicit operator T2(DiscriminatedUnion<T1,T2> du) {return (T2)du.value; }

      object value;
 }

Чтобы заставить его работать 3 или более, нам нужно повторить этот класс несколько раз. У любого есть решение для перегрузки функций в зависимости от типа времени выполнения?

Ответ 5

Если у вас есть этот

type internal Either<'a, 'b> =
  | Left of 'a
  | Right of 'b

в F #, то эквивалент С# для CLR, сгенерированный для класса Either<'a, 'b>, имеет внутренние типы, такие как

internal  class _Left : Either<a, b>
{
     internal readonly a left1;
     internal _Left(a left1);
}

каждый с тегом, методом getter и factory

internal const  int tag_Left = 0;
internal static  Either<a, b> Left(a Left1);
internal a Left1 {  get; }

плюс дискриминатор

internal int  Tag { get; }

и множество методов для реализации интерфейсов IStructuralEquatable, IComparable, IStructuralComparable

Ответ 6

В классе существует дискретный тип объединения, установленный во время компиляции, в .

private class ClearMessage
{
    public static readonly ClearMessage Instance = new ClearMessage();    
    private ClearMessage() { }
}

private class TryDequeueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();    
    private TryDequeueMessage() { }
}

private class EnqueueMessage
{
    public TValue Item { get; private set; }    
    private EnqueueMessage(TValue item) { Item = item; }
}

Использование дискриминированного объединения может быть выполнено следующим образом:

// New file
// Create an alias
using Message = Union<ClearMessage, TryDequeueMessage, EnqueMessage>;

int ProcessMessage(Message msg)
{
   return Message.Match(
      clear => 1,
      dequeue => 2,
      enqueue => 3);
}