Подтвердить что ты не робот

Redis + ActionController:: Живые потоки не умирают

Фон: Мы создали функцию чата в одном из наших существующих приложений Rails. Мы используем новый модуль ActionController::Live и запускаем Puma (с выпуском Nginx) и подписываемся на сообщения через Redis. Мы используем клиентскую сторону EventSource, чтобы установить соединение асинхронно.

Резюме проблемы: Нити никогда не умирают, когда соединение завершается.

Например, если пользователь переместится, закройте браузер или даже перейдете на другую страницу в приложении, появится новый поток (как и ожидалось), но старый продолжает жить.

Проблема, как я сейчас вижу, заключается в том, что когда возникает какая-либо из этих ситуаций, сервер не знает, завершается ли соединение в конце браузера, пока что-то не попытается написать этот сломанный поток, чего никогда не произойдет как только браузер уйдет с исходной страницы.

Эта проблема представляется документированной в github, и подобные вопросы задаются в StackOverflow здесь (довольно точно такой же вопрос) и здесь (относительно получения количества активных потоков).

Единственное решение, с которым я смог придумать, основываясь на этих сообщениях, - это реализовать тип ниток/подключение покера. Попытка написать сломанное соединение создает IOError, который я могу уловить и правильно закрыть соединение, позволяя потоку умереть. Это код контроллера для этого решения:

def events
  response.headers["Content-Type"] = "text/event-stream"

  stream_error = false; # used by flusher thread to determine when to stop

  redis = Redis.new

  # Subscribe to our events
  redis.subscribe("message.create", "message.user_list_update") do |on| 
    on.message do |event, data| # when message is received, write to stream
      response.stream.write("messageType: '#{event}', data: #{data}\n\n")
    end

    # This is the monitor / connection poker thread
    # Periodically poke the connection by attempting to write to the stream
    flusher_thread = Thread.new do
      while !stream_error
        $redis.publish "message.create", "flusher_test"
        sleep 2.seconds
      end
    end
  end 

  rescue IOError
    logger.info "Stream closed"
    stream_error = true;
  ensure
    logger.info "Events action is quitting redis and closing stream!"
    redis.quit
    response.stream.close
end

(Примечание: метод events, кажется, блокируется при вызове метода subscribe. Все остальное (потоковая передача) работает правильно, поэтому я предполагаю, что это нормально.)

(Другое примечание: концепция потока flusher имеет больше смысла как один длительный фоновый процесс, немного похожий на сборщик мусорных потоков. Проблема с моей реализацией выше заключается в том, что для каждого соединения создается новый поток, который бессмысленно. Любой, кто пытается реализовать эту концепцию, должен сделать это больше как один процесс, а не столько, как я изложил. Я обновлю этот пост, когда я успешно повторю его реализацию как один фоновый процесс.)

Недостатком этого решения является то, что мы только задерживали или уменьшали проблему, а не полностью ее устраняли. У нас все еще есть 2 потока на пользователя, в дополнение к другим запросам, таким как ajax, который кажется ужасным с точки зрения масштабирования; он кажется совершенно недосягаемым и нецелесообразным для более крупной системы со многими возможными параллельными соединениями.

Я чувствую, что мне не хватает чего-то жизненно важного; Мне трудно поверить, что у Rails есть функция, которая настолько явно сломана, не реализуя настраиваемый контролер соединений, как я это делал.

Вопрос:. Как мы разрешаем соединениям/потокам умирать, не реализуя что-то банально, например, "покер покера" или сборщик мусора?

Как всегда, дайте мне знать, если я что-то оставил.

Обновление Просто добавьте немного дополнительной информации: Huetsch over at github опубликовал этот комментарий, указав, что SSE основан на TCP, который обычно отправляет FIN пакет, когда соединение закрыто, что позволяет другому концу (сервер в этом случае) знать, что его безопасно закрыть соединение. Huetsch указывает, что либо браузер не отправляет этот пакет (возможно, ошибка в библиотеке EventSource?), Либо Rails не ловит его или ничего не делает с ним (определенно, ошибка в Rails, если это так). Поиск продолжается...

Другое обновление Используя Wireshark, я действительно могу увидеть пакеты FIN, отправляемые. По общему признанию, я не очень осведомлен или опыт работы с уровнем протокола, однако из того, что я могу сказать, я определенно обнаруживаю, что пакет FIN, отправляемый из браузера, когда я устанавливаю SSE-соединение с использованием EventSource из браузера, и NO-пакет отправляется, если я удалите это соединение (что означает отсутствие SSE). Хотя я не ужасно осведомлен о своих знаниях TCP, это, по-видимому, указывает мне, что соединение действительно заканчивается клиентом; возможно, это указывает на ошибку в Puma или Rails.

Еще одно обновление @JamesBoutcher/boutcheratwest (github) указал мне на обсуждение на веб-сайте redis относительноэтот вопрос, особенно в связи с тем, что метод .(p)subscribe никогда не закрывается. Плакат на этом сайте указал то же самое, что мы обнаружили здесь, что среда Rails никогда не уведомляется, когда клиентское соединение закрыто и поэтому не может выполнить метод .(p)unsubscribe. Он спрашивает о тайм-ауте для метода .(p)subscribe, который, как я думаю, будет работать, хотя я не уверен, какой метод (связанный покер, который я описал выше, или его предложение тайм-аута) будет лучшим решением. В идеале, для решения для подключения покера я хотел бы найти способ определить, закрыто ли соединение на другом конце без записи в поток. Как сейчас, как вы можете видеть, мне нужно реализовать код на стороне клиента, чтобы обрабатывать мое "выталкивающее" сообщение отдельно, что, по моему мнению, навязчиво и тупо, как черт.

4b9b3361

Ответ 1

Решение, которое я только что сделал (заимствуя много от @teeg), который, похоже, работает нормально (не пробовал проверить его, tho)

конфигурации/Инициализаторы/redis.rb

$redis = Redis.new(:host => "xxxx.com", :port => 6379)

heartbeat_thread = Thread.new do
  while true
    $redis.publish("heartbeat","thump")
    sleep 30.seconds
  end
end

at_exit do
  # not sure this is needed, but just in case
  heartbeat_thread.kill
  $redis.quit
end

И затем в моем контроллере:

def events
    response.headers["Content-Type"] = "text/event-stream"
    redis = Redis.new(:host => "xxxxxxx.com", :port => 6379)
    logger.info "New stream starting, connecting to redis"
    redis.subscribe(['parse.new','heartbeat']) do |on|
      on.message do |event, data|
        if event == 'parse.new'
          response.stream.write("event: parse\ndata: #{data}\n\n")
        elsif event == 'heartbeat'
          response.stream.write("event: heartbeat\ndata: heartbeat\n\n")
        end
      end
    end
  rescue IOError
    logger.info "Stream closed"
  ensure
    logger.info "Stopping stream thread"
    redis.quit
    response.stream.close
  end

Ответ 2

В настоящее время я создаю приложение, которое вращается вокруг ActionController: Live, EventSource и Puma, а для тех, кто сталкивается с проблемами закрытия потоков и т.д. вместо спасения IOError, в Rails 4.2 вам нужно спасти ClientDisconnected, Пример:

def stream
  #Begin is not required
  twitter_client = Twitter::Streaming::Client.new(config_params) do |obj|
    # Do something
  end
rescue ClientDisconnected
  # Do something when disconnected
ensure
  # Do something else to ensure the stream is closed
end

Я нашел этот удобный совет с этого сообщения на форуме (полностью внизу): http://railscasts.com/episodes/401-actioncontroller-live?view=comments

Ответ 3

Основываясь на @James Boutcher, я использовал следующее в кластерной Puma с двумя рабочими, так что у меня есть только 1 поток, созданный для heartbeat в config/initializers/redis.rb:

конфигурации/puma.rb

on_worker_boot do |index|
  puts "worker nb #{index.to_s} booting"
  create_heartbeat if index.to_i==0
end

def create_heartbeat
  puts "creating heartbeat"
  $redis||=Redis.new
  heartbeat = Thread.new do
    ActiveRecord::Base.connection_pool.release_connection
    begin
      while true
        hash={event: "heartbeat",data: "heartbeat"}
        $redis.publish("heartbeat",hash.to_json)
        sleep 20.seconds
      end
    ensure
      #no db connection anyway
    end
  end
end

Ответ 4

Здесь возможно более простое решение, не использующее биение сердца. После долгих исследований и экспериментов, здесь код, который я использую с синатрой + синатровой жемчужиной (который должен быть легко адаптирован к Rails 4):

class EventServer < Sinatra::Base
 include Sinatra::SSE
 set :connections, []
 .
 .
 .
 get '/channel/:channel' do
 .
 .
 .
  sse_stream do |out|
    settings.connections << out
    out.callback {
      puts 'Client disconnected from sse';
      settings.connections.delete(out);
    }
  redis.subscribe(channel) do |on|
      on.subscribe do |channel, subscriptions|
        puts "Subscribed to redis ##{channel}\n"
      end
      on.message do |channel, message|
        puts "Message from redis ##{channel}: #{message}\n"
        message = JSON.parse(message)
        .
        .
        .
        if settings.connections.include?(out)
          out.push(message)
        else
          puts 'closing orphaned redis connection'
          redis.unsubscribe
        end
      end
    end
  end
end

Redis соединяет блоки on.message и принимает только команды (p) subscribe/(p) unsubscribe. После отмены подписки соединение redis больше не блокируется и может быть выпущено объектом веб-сервера, который был создан в результате первоначального запроса sse. Он автоматически очищается, когда вы получаете сообщение redis, а sse-соединение с браузером больше не существует в массиве коллекции.

Ответ 5

Здесь вы найдете решение с таймаутом, которое выйдет из блокировки Redis. (p) подписаться на вызов и убить неиспользуемый прогон соединения.

class Stream::FixedController < StreamController
  def events
    # Rails reserve a db connection from connection pool for
    # each request, lets put it back into connection pool.
    ActiveRecord::Base.clear_active_connections!

    # Last time of any (except heartbeat) activity on stream
    # it mean last time of any message was send from server to client
    # or time of setting new connection
    @last_active = Time.zone.now

    # Redis (p)subscribe is blocking request so we need do some trick
    # to prevent it freeze request forever.
    redis.psubscribe("messages:*", 'heartbeat') do |on|
      on.pmessage do |pattern, event, data|
        # capture heartbeat from Redis pub/sub
        if event == 'heartbeat'
          # calculate idle time (in secounds) for this stream connection
          idle_time = (Time.zone.now - @last_active).to_i

          # Now we need to relase connection with Redis.(p)subscribe
          # chanel to allow go of any Exception (like connection closed)
          if idle_time > 4.minutes
            # unsubscribe from Redis because of idle time was to long
            # that all - fix in (almost)one line :)
            redis.punsubscribe
          end
        else
          # save time of this (last) activity
          @last_active = Time.zone.now
        end
        # write to stream - even heartbeat - it sometimes chance to
        # capture dissconection error before idle_time
        response.stream.write("event: #{event}\ndata: #{data}\n\n")
      end
    end
    # blicking end (no chance to get below this line without unsubscribe)
  rescue IOError
    Logs::Stream.info "Stream closed"
  rescue ClientDisconnected
    Logs::Stream.info "ClientDisconnected"
  rescue ActionController::Live::ClientDisconnected
    Logs::Stream.info "Live::ClientDisconnected"
  ensure
    Logs::Stream.info "Stream ensure close"
    redis.quit
    response.stream.close
  end
end

Вы должны использовать reds. (p) отказаться от подписки, чтобы завершить этот блокирующий вызов. Никакое исключение не может нарушить это.

Мое простое приложение с информацией об этом исправлении: https://github.com/piotr-kedziak/redis-subscribe-stream-puma-fix

Ответ 6

Вместо того, чтобы посылать пульс всем клиентам, было бы проще просто установить сторожевой таймер для каждого подключения. [Спасибо @NeilJewers]

class Stream::FixedController < StreamController
  def events
    # Rails reserve a db connection from connection pool for
    # each request, lets put it back into connection pool.
    ActiveRecord::Base.clear_active_connections!

    redis = Redis.new

    watchdog = Doberman::WatchDog.new(:timeout => 20.seconds)
    watchdog.start

    # Redis (p)subscribe is blocking request so we need do some trick
    # to prevent it freeze request forever.
    redis.psubscribe("messages:*") do |on|
      on.pmessage do |pattern, event, data|
        begin
          # write to stream - even heartbeat - it sometimes chance to
          response.stream.write("event: #{event}\ndata: #{data}\n\n")
          watchdog.ping

        rescue Doberman::WatchDog::Timeout => e
          raise ClientDisconnected if response.stream.closed?
          watchdog.ping
        end
      end
    end

  rescue IOError
  rescue ClientDisconnected

  ensure
    response.stream.close
    redis.quit
    watchdog.stop
  end
end