Подтвердить что ты не робот

Redis failover с StackExchange/Sentinel от С#

В настоящее время мы используем Redis 2.8.4 и StackExchange.Redis(и любим его), но на данный момент не имеем никакой защиты от сбоев оборудования и т.д. Я пытаюсь заставить решение работать, в котором у нас есть мастер/ведомые и контрольный контроль, но не могут получить его, и я не могу найти никаких реальных указателей после поиска.

Итак, в настоящее время мы получили это:

У нас есть 3 сервера redis и дозорный сигнал на каждом node (настройка ребятами Linux): devredis01: 6383 (мастер) devredis02: 6383 (slave) devredis03: 6383 (slave) devredis01: 26379 (дозорный) devredis02: 26379 (дозорный) devredis03: 26379 (дозорный)

Я могу подключить клиента StackExchange к серверам redis и написать/прочитать и убедиться, что данные реплицируются во всех экземплярах redis с помощью Redis Desktop Manager.

Я также могу подключиться к службам часового с помощью другого ConnectionMultiplexer, запросить конфигурацию, запросить мастер redis node, запросить ведомые устройства и т.д.

Мы также можем убить мастер redis node и убедиться, что один из ведомых продвинут на master и репликация на другой подчиненный продолжает работать. Мы можем наблюдать соединение redis, пытающееся восстановить соединение с мастером, а также, если я заново создаю ConnectionMultiplexer, я могу писать/читать снова вновь создаваемому ведущему и читать из подчиненного устройства.

До сих пор так хорошо!

Бит, который мне не хватает, заключается в том, как вы объединяете все это в производственную систему?

Должен ли я получать конечные точки redis от часового и использовать 2 ConnectionMultiplexers? Что именно мне нужно сделать, чтобы обнаружить, что node опустился? Может ли StackExchange сделать это для меня автоматически или передать событие, чтобы я мог повторно подключить свой redis ConnectionMultiplexer? Должен ли я обрабатывать событие ConnectionFailed, а затем снова подключаться, чтобы ConnectionMuliplexer смог узнать, что нового мастера? Предположительно, когда я снова подключусь, любые попытки написать будут потеряны?

Надеюсь, я не пропущу что-то очень очевидное здесь. Я просто изо всех сил пытаюсь собрать все это.

Спасибо заранее!

4b9b3361

Ответ 1

Я просто задал этот вопрос и нашел аналогичный вопрос для вашего и моего, который, как мне кажется, отвечает на вопрос о том, как теперь наш код (клиент) знает, что является новым главным сервером, когда текущий мастер идет вниз?

Как сообщить клиенту, что новый мастер Redis использует Sentinel

По-видимому, вам просто нужно подписаться и послушать события от Стражей. Имеет смысл. Я просто подумал, что есть более оптимизированный способ.

Я прочитал кое-что о Twemproxy для Linux, который действует как прокси-сервер и, вероятно, делает это для вас? Но я был на redis для Windows и пытался найти вариант Windows. Мы могли бы просто переместиться в Linux, если это одобренный способ сделать это.

Ответ 2

На прошлой неделе мне удалось провести некоторое время с сценариями тестирования парней Linux и работать со стороной С# для этой реализации, и я использую следующий подход:

  • Прочитайте адреса отправителя из конфигурации и создайте ConnectionMultiplexer для подключения к ним.
  • Подпишитесь на канал + master-переключателя
  • Задайте каждому серверу-отправителю, в свою очередь, то, что, по их мнению, мастер redis и slaves, сравните их все, чтобы убедиться, что все согласны.
  • Создайте новый ConnectionMultiplexer с адресами redis-сервера, которые считываются с отправителя и подключаются, добавьте обработчик событий в ConnectionFailed и ConnectionRestored.
  • Когда я получаю сообщение + master-master +, вызываю команду Configure() на redis ConnectionMultiplexer
  • В качестве подхода к поясу и фигурным скобкам я всегда вызываю Configure() на redis ConnectionMultiplexer через 12 секунд после получения события connectionFailed или connectionRestored, когда тип соединения - ConnectionType.Interactive.

Я нахожу, что обычно я работаю и переконфигурировался примерно через 5 секунд, потеряв мастер redis. За это время я не могу писать, но я могу читать (так как вы можете читать раба). 5 секунд в порядке для нас, так как наши данные обновляются очень быстро и становятся устаревшими через несколько секунд (и впоследствии перезаписываются).

Одна вещь, о которой я не был уверен, заключалась в том, следует ли мне удалить redis-сервер из redis ConnectionMultiplexer, когда экземпляр отключен, или пусть он продолжит повторное соединение. Я решил оставить его повторно, поскольку он возвращается в микс как раб, как только он возвращается. Я провел некоторое тестирование производительности с повторным подключением и без него, и это, казалось, не имело большого значения. Может быть, кто-то может прояснить, является ли это правильным.

Время от времени возвращение экземпляра, который ранее был мастером, по-видимому, вызывал некоторую путаницу - через несколько секунд после его возвращения я получал исключение из письма - "READONLY", предполагая, что я не могу написать раб. Это было редко, но я обнаружил, что мой "уловный" подход вызова Configure() через 12 секунд после изменения состояния соединения поймал эту проблему. Вызов Configure() кажется очень дешевым и поэтому вызывает его дважды, независимо от того, нужно ли это было или нет.

Теперь, когда у меня есть подчиненные, я выгрузил часть кода очистки данных, который выполняет ключевые проверки для подчиненных, что делает меня счастливым.

В целом я довольно доволен, он не идеален, но для чего-то, что очень редко бывает, это более чем достаточно.

Ответ 3

Я включаю нашу оболочку Redis, она несколько изменилась с оригинального ответа по разным причинам:

  • Мы хотели использовать pub/sub
  • Sentinel не всегда показывал нам сообщение с измененным сообщением в "правильное" время (то есть мы его вызывали Configure() и в конечном итоге думали, что подчиненный был мастером)
  • ConnectionMultiplexer не всегда восстанавливал связи каждый раз, затрагивая pub/sub

Я скорее подозреваю, что это больше, чем наша конфигурация sentinel/redis. В любом случае, это просто не было абсолютно надежным, несмотря на деструктивное тестирование. В дополнение к которому мастер изменил сообщение, потребовалось много времени с тех пор, как нам пришлось увеличить тайм-ауты из-за того, что часовое "слишком чувствительное" и вызвало отказы при отсутствии каких-либо изменений. Я думаю, что работа в виртуальной среде также усугубляет проблему.

Вместо того, чтобы прослушивать подписки, мы просто пытаемся выполнить тест на запись каждые 5 секунд, а также получаем "последнее сообщение, полученное" для pub/sub. Если мы столкнемся с какими-либо проблемами, мы полностью уничтожим связи и перестроим их. Это кажется излишним, но на самом деле это довольно быстро и все же быстрее, чем ждать, когда мастер изменил сообщение от дозорного...

Это не будет компилироваться без различных методов расширения и других классов и т.д., но вы получите эту идею.

namespace Smartodds.Framework.Redis
{
    public class RedisClient : IDisposable
    {
        public RedisClient(RedisEnvironmentElement environment, Int32 databaseId)
        {
            m_ConnectTimeout = environment.ConnectTimeout;
            m_Timeout = environment.Timeout;
            m_DatabaseId = databaseId;
            m_ReconnectTime = environment.ReconnectTime;
            m_CheckSubscriptionsTime = environment.CheckSubscriptions;
            if (environment.TestWrite == true)
            {
                m_CheckWriteTime = environment.TestWriteTime;
            }

            environment.Password.ToCharArray().ForEach((c) => m_Password.AppendChar(c));

            foreach (var server in environment.Servers)
            {
                if (server.Type == ServerType.Redis)
                {
                    // will be ignored if sentinel servers are used
                    m_RedisServers.Add(new RedisConnection { Address = server.Host, Port = server.Port });
                }
                else
                {
                    m_SentinelServers.Add(new RedisConnection { Address = server.Host, Port = server.Port });
                }
            }
        }

        public bool IsSentinel { get { return m_SentinelServers.Count > 0; } }

        public IDatabase Database { get { return _Redis.GetDatabase(m_DatabaseId); } }

        private ConnectionMultiplexer _Redis
        {
            get
            {
                if (m_Connecting == true)
                {
                    throw new RedisConnectionNotReadyException();
                }

                ConnectionMultiplexer redis = m_Redis;
                if (redis == null)
                {
                    throw new RedisConnectionNotReadyException();
                }

                return redis;
            }
        }

        private ConnectionMultiplexer _Sentinel
        {
            get
            {
                if (m_Connecting == true)
                {
                    throw new RedisConnectionNotReadyException("Sentinel connection not ready");
                }

                ConnectionMultiplexer sentinel = m_Sentinel;
                if (sentinel == null)
                {
                    throw new RedisConnectionNotReadyException("Sentinel connection not ready");
                }

                return sentinel;
            }
        }

        public void RegisterSubscription(string channel, Action<RedisChannel, RedisValue> handler, Int32 maxNoReceiveSeconds)
        {
            m_Subscriptions.Add(channel, new RedisSubscription
            {
                Channel = channel,
                Handler = handler,
                MaxNoReceiveSeconds = maxNoReceiveSeconds,
                LastUsed = DateTime.UtcNow,
            });
        }

        public void Connect()
        {
            _Connect(true);
        }

        private void _Connect(object state)
        {
            bool throwException = (bool)state;

            // if a reconnect is already being attempted, don't hang around waiting
            if (Monitor.TryEnter(m_ConnectionLocker) == false)
            {
                return;
            }

            // we took the lock, notify everything we are connecting
            m_Connecting = true;

            try
            {
                Stopwatch sw = Stopwatch.StartNew();
                LoggerQueue.Debug(">>>>>> REDIS CONNECTING... >>>>>>");

                // if this is a reconnect, make absolutely sure everything is cleaned up first
                _KillTimers();
                _KillRedisClient();

                if (this.IsSentinel == true && m_Sentinel == null)
                {
                    LoggerQueue.Debug(">>>>>> CONNECTING TO SENTINEL >>>>>> - " + sw.Elapsed);

                    // we'll be getting the redis servers from sentinel
                    ConfigurationOptions sentinelConnection = _CreateRedisConfiguration(CommandMap.Sentinel, null, m_SentinelServers);
                    m_Sentinel = ConnectionMultiplexer.Connect(sentinelConnection);
                    LoggerQueue.Debug(">>>>>> CONNECTED TO SENTINEL >>>>>> - " + sw.Elapsed);

                    _OutputConfigurationFromSentinel();

                    // get all the redis servers from sentinel and ignore any set by caller
                    m_RedisServers.Clear();
                    m_RedisServers.AddRange(_GetAllRedisServersFromSentinel());

                    if (m_RedisServers.Count == 0)
                    {
                        throw new RedisException("Sentinel found no redis servers");
                    }
                }

                LoggerQueue.Debug(">>>>>> CONNECTING TO REDIS >>>>>> - " + sw.Elapsed);

                // try to connect to all redis servers
                ConfigurationOptions connection = _CreateRedisConfiguration(CommandMap.Default, _SecureStringToString(m_Password), m_RedisServers);
                m_Redis = ConnectionMultiplexer.Connect(connection);
                LoggerQueue.Debug(">>>>>> CONNECTED TO REDIS >>>>>> - " + sw.Elapsed);

                // register subscription channels
                m_Subscriptions.ForEach(s =>
                {
                    m_Redis.GetSubscriber().Subscribe(s.Key, (channel, value) => _SubscriptionHandler(channel, value));
                    s.Value.LastUsed = DateTime.UtcNow;
                });

                if (this.IsSentinel == true)
                {
                    // check subscriptions have been sending messages
                    if (m_Subscriptions.Count > 0)
                    {
                        m_CheckSubscriptionsTimer = new Timer(_CheckSubscriptions, null, 30000, m_CheckSubscriptionsTime);
                    }

                    if (m_CheckWriteTime != null)
                    {
                        // check that we can write to redis
                        m_CheckWriteTimer = new Timer(_CheckWrite, null, 32000, m_CheckWriteTime.Value);
                    }

                    // monitor for connection status change to any redis servers
                    m_Redis.ConnectionFailed += _ConnectionFailure;
                    m_Redis.ConnectionRestored += _ConnectionRestored;
                }

                LoggerQueue.Debug(string.Format(">>>>>> ALL REDIS CONNECTED ({0}) >>>>>>", sw.Elapsed));
            }
            catch (Exception ex)
            {
                LoggerQueue.Error(">>>>>> REDIS CONNECT FAILURE >>>>>>", ex);

                if (throwException == true)
                {
                    throw;
                }
                else
                {
                    // internal reconnect, the reconnect has failed so might as well clean everything and try again
                    _KillTimers();
                    _KillRedisClient();

                    // faster than usual reconnect if failure
                    _ReconnectTimer(1000);
                }
            }
            finally
            {
                // finished connection attempt, notify everything and remove lock
                m_Connecting = false;
                Monitor.Exit(m_ConnectionLocker);
            }
        }

        private ConfigurationOptions _CreateRedisConfiguration(CommandMap commandMap, string password, List<RedisConnection> connections)
        {
            ConfigurationOptions connection = new ConfigurationOptions
            {
                CommandMap = commandMap,
                AbortOnConnectFail = true,
                AllowAdmin = true,
                ConnectTimeout = m_ConnectTimeout,
                SyncTimeout = m_Timeout,
                ServiceName = "master",
                TieBreaker = string.Empty,
                Password = password,
            };

            connections.ForEach(s =>
            {
                connection.EndPoints.Add(s.Address, s.Port);
            });

            return connection;
        }

        private void _OutputConfigurationFromSentinel()
        {
            m_SentinelServers.ForEach(s =>
            {
                try
                {
                    IServer server = m_Sentinel.GetServer(s.Address, s.Port);
                    if (server.IsConnected == true)
                    {
                        try
                        {
                            IPEndPoint master = server.SentinelGetMasterAddressByName("master") as IPEndPoint;
                            var slaves = server.SentinelSlaves("master");

                            StringBuilder sb = new StringBuilder();
                            sb.Append(">>>>>> _OutputConfigurationFromSentinel Server ");
                            sb.Append(s.Address);
                            sb.Append(" thinks that master is ");
                            sb.Append(master);
                            sb.Append(" and slaves are ");

                            foreach (var slave in slaves)
                            {
                                string name = slave.Where(i => i.Key == "name").Single().Value;
                                bool up = slave.Where(i => i.Key == "flags").Single().Value.Contains("disconnected") == false;

                                sb.Append(name);
                                sb.Append("(");
                                sb.Append(up == true ? "connected" : "down");
                                sb.Append(") ");
                            }
                            sb.Append(">>>>>>");
                            LoggerQueue.Debug(sb.ToString());
                        }
                        catch (Exception ex)
                        {
                            LoggerQueue.Error(string.Format(">>>>>> _OutputConfigurationFromSentinel Could not get configuration from sentinel server ({0}) >>>>>>", s.Address), ex);
                        }
                    }
                    else
                    {
                        LoggerQueue.Error(string.Format(">>>>>> _OutputConfigurationFromSentinel Sentinel server {0} was not connected", s.Address));
                    }
                }
                catch (Exception ex)
                {
                    LoggerQueue.Error(string.Format(">>>>>> _OutputConfigurationFromSentinel Could not get IServer from sentinel ({0}) >>>>>>", s.Address), ex);
                }
            });
        }

        private RedisConnection[] _GetAllRedisServersFromSentinel()
        {
            // ask each sentinel server for its configuration
            List<RedisConnection> redisServers = new List<RedisConnection>();
            m_SentinelServers.ForEach(s =>
            {
                try
                {
                    IServer server = m_Sentinel.GetServer(s.Address, s.Port);
                    if (server.IsConnected == true)
                    {
                        try
                        {
                            // store master in list
                            IPEndPoint master = server.SentinelGetMasterAddressByName("master") as IPEndPoint;
                            redisServers.Add(new RedisConnection { Address = master.Address.ToString(), Port = master.Port });

                            var slaves = server.SentinelSlaves("master");
                            foreach (var slave in slaves)
                            {
                                string address = slave.Where(i => i.Key == "ip").Single().Value;
                                string port = slave.Where(i => i.Key == "port").Single().Value;

                                redisServers.Add(new RedisConnection { Address = address, Port = Convert.ToInt32(port) });
                            }
                        }
                        catch (Exception ex)
                        {
                            LoggerQueue.Error(string.Format(">>>>>> _GetAllRedisServersFromSentinel Could not get redis servers from sentinel server ({0}) >>>>>>", s.Address), ex);
                        }
                    }
                    else
                    {
                        LoggerQueue.Error(string.Format(">>>>>> _GetAllRedisServersFromSentinel Sentinel server {0} was not connected", s.Address));
                    }
                }
                catch (Exception ex)
                {
                    LoggerQueue.Error(string.Format(">>>>>> _GetAllRedisServersFromSentinel Could not get IServer from sentinel ({0}) >>>>>>", s.Address), ex);
                }
            });

            return redisServers.Distinct().ToArray();
        }

        private IServer _GetRedisMasterFromSentinel()
        {
            // ask each sentinel server for its configuration
            foreach (RedisConnection sentinel in m_SentinelServers)
            {
                IServer sentinelServer = _Sentinel.GetServer(sentinel.Address, sentinel.Port);
                if (sentinelServer.IsConnected == true)
                {
                    try
                    {
                        IPEndPoint master = sentinelServer.SentinelGetMasterAddressByName("master") as IPEndPoint;
                        return _Redis.GetServer(master);
                    }
                    catch (Exception ex)
                    {
                        LoggerQueue.Error(string.Format(">>>>>> Could not get redis master from sentinel server ({0}) >>>>>>", sentinel.Address), ex);
                    }
                }
            }

            throw new InvalidOperationException("No sentinel server available to get master");
        }

        private void _ReconnectTimer(Nullable<Int32> reconnectMilliseconds)
        {
            try
            {
                lock (m_ReconnectLocker)
                {
                    if (m_ReconnectTimer != null)
                    {
                        m_ReconnectTimer.Dispose();
                        m_ReconnectTimer = null;
                    }

                    // since a reconnect will definately occur we can stop the check timers for now until reconnect succeeds (where they are recreated)
                    _KillTimers();

                    LoggerQueue.Warn(">>>>>> REDIS STARTING RECONNECT TIMER >>>>>>");

                    m_ReconnectTimer = new Timer(_Connect, false, reconnectMilliseconds.GetValueOrDefault(m_ReconnectTime), Timeout.Infinite);
                }
            }
            catch (Exception ex)
            {
                LoggerQueue.Error("Error during _ReconnectTimer", ex);
            }
        }

        private void _CheckSubscriptions(object state)
        {
            if (Monitor.TryEnter(m_ConnectionLocker, TimeSpan.FromSeconds(1)) == false)
            {
                return;
            }

            try
            {
                DateTime now = DateTime.UtcNow;
                foreach (RedisSubscription subscription in m_Subscriptions.Values)
                {
                    if ((now - subscription.LastUsed) > TimeSpan.FromSeconds(subscription.MaxNoReceiveSeconds))
                    {
                        try
                        {
                            EndPoint endpoint = m_Redis.GetSubscriber().IdentifyEndpoint(subscription.Channel);
                            EndPoint subscribedEndpoint = m_Redis.GetSubscriber().SubscribedEndpoint(subscription.Channel);

                            LoggerQueue.Warn(string.Format(">>>>>> REDIS Channel '{0}' has not been used for longer than {1}s, IsConnected: {2}, IsConnectedChannel: {3}, EndPoint: {4}, SubscribedEndPoint: {5}, reconnecting...", subscription.Channel, subscription.MaxNoReceiveSeconds, m_Redis.GetSubscriber().IsConnected(), m_Redis.GetSubscriber().IsConnected(subscription.Channel), endpoint != null ? endpoint.ToString() : "null", subscribedEndpoint != null ? subscribedEndpoint.ToString() : "null"));
                        }
                        catch (Exception ex)
                        {
                            LoggerQueue.Error(string.Format(">>>>>> REDIS Error logging out details of Channel '{0}' reconnect", subscription.Channel), ex);
                        }

                        _ReconnectTimer(null);
                        return;
                    }
                }
            }
            catch (Exception ex)
            {
                LoggerQueue.Error(">>>>>> REDIS Exception ERROR during _CheckSubscriptions", ex);
            }
            finally
            {
                Monitor.Exit(m_ConnectionLocker);
            }
        }

        private void _CheckWrite(object state)
        {
            if (Monitor.TryEnter(m_ConnectionLocker, TimeSpan.FromSeconds(1)) == false)
            {
                return;
            }

            try
            {
                this.Database.HashSet(Environment.MachineName + "SmartoddsWriteCheck", m_CheckWriteGuid.ToString(), DateTime.UtcNow.Ticks);
            }
            catch (RedisConnectionNotReadyException)
            {
                LoggerQueue.Warn(">>>>>> REDIS RedisConnectionNotReadyException ERROR DURING _CheckWrite");
            }
            catch (RedisServerException ex)
            {
                LoggerQueue.Warn(">>>>>> REDIS RedisServerException ERROR DURING _CheckWrite, reconnecting... - " + ex.Message);
                _ReconnectTimer(null);
            }
            catch (RedisConnectionException ex)
            {
                LoggerQueue.Warn(">>>>>> REDIS RedisConnectionException ERROR DURING _CheckWrite, reconnecting... - " + ex.Message);
                _ReconnectTimer(null);
            }
            catch (TimeoutException ex)
            {
                LoggerQueue.Warn(">>>>>> REDIS TimeoutException ERROR DURING _CheckWrite - " + ex.Message);
            }
            catch (Exception ex)
            {
                LoggerQueue.Error(">>>>>> REDIS Exception ERROR during _CheckWrite", ex);
            }
            finally
            {
                Monitor.Exit(m_ConnectionLocker);
            }
        }

        private void _ConnectionFailure(object sender, ConnectionFailedEventArgs e)
        {
            LoggerQueue.Warn(string.Format(">>>>>> REDIS CONNECTION FAILURE, {0}, {1}, {2} >>>>>>", e.ConnectionType, e.EndPoint.ToString(), e.FailureType));
        }

        private void _ConnectionRestored(object sender, ConnectionFailedEventArgs e)
        {
            LoggerQueue.Warn(string.Format(">>>>>> REDIS CONNECTION RESTORED, {0}, {1}, {2} >>>>>>", e.ConnectionType, e.EndPoint.ToString(), e.FailureType));
        }

        private void _SubscriptionHandler(string channel, RedisValue value)
        {
            // get handler lookup
            RedisSubscription subscription = null;
            if (m_Subscriptions.TryGetValue(channel, out subscription) == false || subscription == null)
            {
                return;
            }

            // update last used
            subscription.LastUsed = DateTime.UtcNow;

            // call handler
            subscription.Handler(channel, value);
        }

        public Int64 Publish(string channel, RedisValue message)
        {
            try
            {
                return _Redis.GetSubscriber().Publish(channel, message);
            }
            catch (RedisConnectionNotReadyException)
            {
                LoggerQueue.Error("REDIS RedisConnectionNotReadyException ERROR DURING Publish");
                throw;
            }
            catch (RedisServerException ex)
            {
                LoggerQueue.Error("REDIS RedisServerException ERROR DURING Publish - " + ex.Message);
                throw;
            }
            catch (RedisConnectionException ex)
            {
                LoggerQueue.Error("REDIS RedisConnectionException ERROR DURING Publish - " + ex.Message);
                throw;
            }
            catch (TimeoutException ex)
            {
                LoggerQueue.Error("REDIS TimeoutException ERROR DURING Publish - " + ex.Message);
                throw;
            }
            catch (Exception ex)
            {
                LoggerQueue.Error("REDIS Exception ERROR DURING Publish", ex);
                throw;
            }
        }

        public bool LockTake(RedisKey key, RedisValue value, TimeSpan expiry)
        {
            return _Execute(() => this.Database.LockTake(key, value, expiry));
        }

        public bool LockExtend(RedisKey key, RedisValue value, TimeSpan extension)
        {
            return _Execute(() => this.Database.LockExtend(key, value, extension));
        }

        public bool LockRelease(RedisKey key, RedisValue value)
        {
            return _Execute(() => this.Database.LockRelease(key, value));
        }

        private void _Execute(Action action)
        {
            try
            {
                action.Invoke();
            }
            catch (RedisServerException ex)
            {
                LoggerQueue.Error("REDIS RedisServerException ERROR DURING _Execute - " + ex.Message);
                throw;
            }
            catch (RedisConnectionException ex)
            {
                LoggerQueue.Error("REDIS RedisConnectionException ERROR DURING _Execute - " + ex.Message);
                throw;
            }
            catch (TimeoutException ex)
            {
                LoggerQueue.Error("REDIS TimeoutException ERROR DURING _Execute - " + ex.Message);
                throw;
            }
            catch (Exception ex)
            {
                LoggerQueue.Error("REDIS Exception ERROR DURING _Execute", ex);
                throw;
            }
        }

        private TResult _Execute<TResult>(Func<TResult> function)
        {
            try
            {
                return function.Invoke();
            }
            catch (RedisServerException ex)
            {
                LoggerQueue.Error("REDIS RedisServerException ERROR DURING _Execute - " + ex.Message);
                throw;
            }
            catch (RedisConnectionException ex)
            {
                LoggerQueue.Error("REDIS RedisConnectionException ERROR DURING _Execute - " + ex.Message);
                throw;
            }
            catch (TimeoutException ex)
            {
                LoggerQueue.Error("REDIS TimeoutException ERROR DURING _Execute - " + ex.Message);
                throw;
            }
            catch (Exception ex)
            {
                LoggerQueue.Error("REDIS ERROR DURING _Execute", ex);
                throw;
            }
        }

        public string[] GetAllKeys(string pattern)
        {
            if (m_Sentinel != null)
            {
                return _GetAnyRedisSlaveFromSentinel().Keys(m_DatabaseId, pattern).Select(k => (string)k).ToArray();
            }
            else
            {
                return _Redis.GetServer(_Redis.GetEndPoints().First()).Keys(m_DatabaseId, pattern).Select(k => (string)k).ToArray();
            }
        }

        private void _KillSentinelClient()
        {
            try
            {
                if (m_Sentinel != null)
                {
                    LoggerQueue.Debug(">>>>>> KILLING SENTINEL CONNECTION >>>>>>");

                    ConnectionMultiplexer sentinel = m_Sentinel;
                    m_Sentinel = null;

                    sentinel.Close(false);
                    sentinel.Dispose();
                }
            }
            catch (Exception ex)
            {
                LoggerQueue.Error(">>>>>> Error during _KillSentinelClient", ex);
            }
        }

        private void _KillRedisClient()
        {
            try
            {
                if (m_Redis != null)
                {
                    Stopwatch sw = Stopwatch.StartNew();
                    LoggerQueue.Debug(">>>>>> KILLING REDIS CONNECTION >>>>>>");

                    ConnectionMultiplexer redis = m_Redis;
                    m_Redis = null;

                    if (this.IsSentinel == true)
                    {
                        redis.ConnectionFailed -= _ConnectionFailure;
                        redis.ConnectionRestored -= _ConnectionRestored;
                    }

                    redis.Close(false);
                    redis.Dispose();

                    LoggerQueue.Debug(">>>>>> KILLED REDIS CONNECTION >>>>>> " + sw.Elapsed);
                }
            }
            catch (Exception ex)
            {
                LoggerQueue.Error(">>>>>> Error during _KillRedisClient", ex);
            }
        }

        private void _KillClients()
        {
            lock (m_ConnectionLocker)
            {
                _KillSentinelClient();
                _KillRedisClient();
            }
        }

        private void _KillTimers()
        {
            if (m_CheckSubscriptionsTimer != null)
            {
                m_CheckSubscriptionsTimer.Dispose();
                m_CheckSubscriptionsTimer = null;
            }
            if (m_CheckWriteTimer != null)
            {
                m_CheckWriteTimer.Dispose();
                m_CheckWriteTimer = null;
            }
        }

        public void Dispose()
        {
            _KillClients();
            _KillTimers();
        }
    }
}