Подтвердить что ты не робот

Потребление потока кинезитов в питоне

Я не могу найти достойный пример, который показывает, как я могу использовать поток AWS Kinesis через Python. Может кто-нибудь, пожалуйста, предоставит мне несколько примеров, на которые я мог бы обратить внимание?

Лучшие

4b9b3361

Ответ 1

Хотя этот вопрос уже был дан ответ, может быть хорошей идеей для будущих читателей рассмотреть возможность использования Kinesis Client Library (KCL) for Python вместо прямого использования boto. Это упрощает потребление из потока при наличии нескольких экземпляров клиента и/или изменении конфигураций осколков.

https://aws.amazon.com/blogs/aws/speak-to-kinesis-in-python/

Более полное перечисление того, что предоставляет KCL

  • Подключается к потоку
  • Перечисляет осколки
  • Координаты ассоциаций осколков с другими работниками (если есть)
  • Создает процессор записи для каждого осколка, который он управляет.
  • Вытягивает записи данных из потока
  • Отбрасывает записи в соответствующий процессор записи
  • Контрольные точки обрабатывают записи (он использует DynamoDB, поэтому вашему коду не нужно вручную сохранять значение контрольной точки)
  • Балансы ассоциаций shard-worker, когда число экземпляров рабочего изменяется
  • Балансы ассоциаций оскол-работника, когда осколки разделены или объединены

Элементы, выделенные полужирным шрифтом, являются теми, которые, по моему мнению, являются тем, где KCL действительно предоставляет нетривиальное значение над boto. Но в зависимости от вашего использования, boto может быть намного проще.

Ответ 2

вам следует использовать boto.kinesis:

from boto import kinesis

После создания потока:

шаг 1: подключиться к aws kinesis:

auth = {"aws_access_key_id":"id", "aws_secret_access_key":"key"}
connection = kinesis.connect_to_region('us-east-1',**auth)

Шаг 2: получите информацию о потоке (например, сколько обрывов, если оно активно..)

tries = 0
while tries < 10:
    tries += 1
    time.sleep(1)
    try:
        response = connection.describe_stream('stream_name')   
        if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
            break 
    except :
        logger.error('error while trying to describe kinesis stream : %s')
else:
    raise TimeoutError('Stream is still not active, aborting...')

Шаг 3: получите все идентификаторы shard и для каждого общего идентификатора получите итератор осколков:

shard_ids = []
stream_name = None 
if response and 'StreamDescription' in response:
    stream_name = response['StreamDescription']['StreamName']                   
    for shard_id in response['StreamDescription']['Shards']:
         shard_id = shard_id['ShardId']
         shard_iterator = connection.get_shard_iterator(stream_name, shard_id, shard_iterator_type)
         shard_ids.append({'shard_id' : shard_id ,'shard_iterator' : shard_iterator['ShardIterator'] })

Шаг 4: прочитайте данные для каждого осколка

limit - это предел для записей, которые вы хотите получить. (вы можете получить до 10 МБ) shard_iterator является общим с предыдущего шага.

tries = 0
result = []
while tries < 100:
     tries += 1
     response = connection.get_records(shard_iterator = shard_iterator , limit = limit)
     shard_iterator = response['NextShardIterator']
     if len(response['Records'])> 0:
          for res in response['Records']: 
               result.append(res['Data'])                  
          return result , shard_iterator

в следующем вызове get_records, вы должны использовать shard_iterator, который вы получили с результатом предыдущих get_records.

note: в одном вызове get_records (limit = None) вы можете получать пустые записи. если вы вызываете get_records с лимитом, вы получите записи, которые находятся в одном ключе раздела (когда вы вставляете данные в поток, вам нужно использовать ключ раздела:

connection.put_record(stream_name, data, partition_key)