Подтвердить что ты не робот

Использование Tweepy для прослушивания потока и поиска твитов. Как остановить предыдущий поиск и прослушать только новый поток?

Я использую Flask и Tweepy для поиска живых твитов. На интерфейсе у меня есть текстовый ввод пользователя и кнопка "Поиск". В идеале, когда пользователь вводит поисковый запрос во вход и нажимает кнопку "Поиск", Tweepy должен прослушивать новый поисковый запрос и останавливать предыдущий поток в поисковых терминах. При нажатии кнопки "Поиск" выполняется эта функция:

@app.route('/search', methods=['POST'])
# gets search-keyword and starts stream
def streamTweets():
    search_term = request.form['tweet']
    search_term_hashtag = '#' + search_term
    # instantiate listener
    listener = StdOutListener()
    # stream object uses listener we instantiated above to listen for data
    stream = tweepy.Stream(auth, listener)

    if stream is not None:
        print "Stream disconnected..."
        stream.disconnect()

    stream.filter(track=[search_term or search_term_hashtag], async=True)
    redirect('/stream') # execute '/stream' sse
    return render_template('index.html')

Маршрут /stream, который выполняется во второй-последней строке в приведенном выше коде, выглядит следующим образом:

@app.route('/stream')
def stream():
    # we will use Pub/Sub process to send real-time tweets to client
    def event_stream():
        # instantiate pubsub
        pubsub = red.pubsub()
        # subscribe to tweet_stream channel
        pubsub.subscribe('tweet_stream')
        # initiate server-sent events on messages pushed to channel
        for message in pubsub.listen():
            yield 'data: %s\n\n' % message['data']
    return Response(stream_with_context(event_stream()), mimetype="text/event-stream")

Мой код работает отлично, в том смысле, что он запускает новый поток и ищет заданный термин всякий раз, когда нажимается кнопка "Поиск", но не останавливает предыдущий поиск. Например, если мой первый поисковый запрос был "NYC", а затем я хотел найти другой термин, скажем, "Лос-Анджелес", он даст мне результаты как для "NYC", так и для "Лос-Анджелеса", что не то, что я хотеть. Я хочу, чтобы меня обыскали только "Лос-Анджелес". Как это исправить? Другими словами, как остановить предыдущий поток? Я просмотрел другие предыдущие потоки, и я знаю, что мне нужно использовать stream.disconnect(), но я не уверен, как реализовать это в своем коде. Любая помощь или ввод были бы весьма полезными. Большое спасибо!

4b9b3361

Ответ 1

Ниже приведен код, который отменяет старые потоки при создании нового потока. Он работает, добавляя новые потоки в глобальный список и затем вызывающий stream.disconnect() во всех потоках в списке всякий раз, когда создается новый поток.

diff --git a/app.py b/app.py
index 1e3ed10..f416ddc 100755
--- a/app.py
+++ b/app.py
@@ -23,6 +23,8 @@ auth.set_access_token(access_token, access_token_secret)
 app = Flask(__name__)
 red = redis.StrictRedis()

+# Add a place to keep track of current streams
+streams = []

 @app.route('/')
 def index():
@@ -32,12 +34,18 @@ def index():
 @app.route('/search', methods=['POST'])
 # gets search-keyword and starts stream
 def streamTweets():
+        # cancel old streams
+        for stream in streams:
+            stream.disconnect()
+
        search_term = request.form['tweet']
        search_term_hashtag = '#' + search_term
        # instantiate listener
        listener = StdOutListener()
        # stream object uses listener we instantiated above to listen for data
        stream = tweepy.Stream(auth, listener)
+        # add this stream to the global list
+        streams.append(stream)
        stream.filter(track=[search_term or search_term_hashtag],
                async=True) # make sure stream is non-blocking
        redirect('/stream') # execute '/stream' sse

То, что это не решает, - проблема управления сеансом. При вашей текущей настройке поиск одним пользователем повлияет на поиск всех пользователей. Этого можно избежать, предоставляя вашим пользователям некоторый идентификатор и сохраняя свои потоки вместе с их идентификатором. Самый простой способ сделать это, скорее всего, будет использовать поддержку Flask session. Вы также можете сделать это с помощью requestId, как предложил Пьер. В любом случае вам также понадобится код, чтобы заметить, когда пользователь закрыл страницу и закрыл ее поток.

Ответ 2

Отказ от ответственности: я ничего не знаю о Tweepy, но это, похоже, проблема дизайна.

Вы пытаетесь добавить состояние в RESTful API? У вас может возникнуть проблема с дизайном. Как ответил JRichardSnape, ваш API не должен заботиться об отмене запроса; это должно быть сделано в интерфейсе. Я имею в виду здесь, в javascript/AJAX/etc, вызывающем эту функцию, добавление другого вызова в новую функцию

@app.route('/cancelSearch', methods=['POST']) С помощью "POST", в котором есть условия поиска. Пока у вас нет состояния, вы не можете сделать это безопасно в асинхронном вызове: Представьте, что кто-то другой делает один и тот же поиск в то же время, а затем отменяет его, отменяет оба (помните, у вас нет состояния, чтобы вы не знали, кого вы отменяете). Возможно, вам нужно состояние с вашим дизайном.

Если вы должны продолжать использовать это и не против нарушать правило "без гражданства", добавьте "состояние" к вашему запросу. В этом случае это не так уж плохо, потому что вы можете запустить поток и называть его с помощью userId, а затем убить поток каждый новый поиск

def streamTweets():
    search_term = request.form['tweet']
    userId = request.form['userId'] # If your limit is one request per user at a time. If multiple windows can be opened and you want to follow this limit, store userId in a cookie.
    #Look for any request currently running with this ID, and cancel them

В качестве альтернативы вы можете вернуть requestId, который вы оставите в интерфейсе, можете вызвать cancelSearch?requestId=$requestId. В cancelSearch вам нужно будет найти ожидающий запрос (звучит как в tweepy, так как вы не используете свои собственные потоки) и отключите его.

Из любопытства я просто смотрел, что происходит при поиске в Google, и он использует запрос GET. Посмотрите (инструменты отладки → Сеть, затем введите текст и посмотрите автозаполнение). Google использует токен, отправленный с каждым запросом (каждый раз, когда вы вводите что-то)). Это не значит, что он используется для этого, но это в основном то, что я описал. Если вы не хотите сеанса, используйте уникальный идентификатор.

Ответ 3

Ну, я решил это, используя метод таймера. Но все же я ищу питоновский путь.

from streamer import StreamListener
def stream():
    hashtag = input
    #assign each user an ID ( for pubsub )
    StreamListener.userid = random_user_id
    def handler(signum, frame):
        print("Forever is over")
        raise Exception("end of time")

    def main_stream():
        stream = tweepy.Stream(auth, StreamListener())
        stream.filter(track=track,async=True)
        redirect(url_for('map_stream'))

    def close_stream():
        # this is for closing client list in redis but don't know it working
        obj = redis.client_list(tweet_stream)
        redis_client_list = obj[0]['addr']
        redis.client_kill(redis_client_list)
        stream = tweepy.Stream(auth, StreamListener())
        stream.disconnect()

    import signal
    signal.signal(signal.SIGALRM, handler)
    signal.alarm(300)
    try:
        main_stream()
    except Exception:
        close_stream()
        print("function terminate")