Я не могу найти достойный пример, который показывает, как я могу использовать поток AWS Kinesis через Python. Может кто-нибудь, пожалуйста, предоставит мне несколько примеров, на которые я мог бы обратить внимание?
Лучшие
Я не могу найти достойный пример, который показывает, как я могу использовать поток AWS Kinesis через Python. Может кто-нибудь, пожалуйста, предоставит мне несколько примеров, на которые я мог бы обратить внимание?
Лучшие
Хотя этот вопрос уже был дан ответ, может быть хорошей идеей для будущих читателей рассмотреть возможность использования Kinesis Client Library (KCL) for Python
вместо прямого использования boto
. Это упрощает потребление из потока при наличии нескольких экземпляров клиента и/или изменении конфигураций осколков.
https://aws.amazon.com/blogs/aws/speak-to-kinesis-in-python/
Более полное перечисление того, что предоставляет KCL
Элементы, выделенные полужирным шрифтом, являются теми, которые, по моему мнению, являются тем, где KCL действительно предоставляет нетривиальное значение над boto. Но в зависимости от вашего использования, boto может быть намного проще.
вам следует использовать boto.kinesis:
from boto import kinesis
После создания потока:
шаг 1: подключиться к aws kinesis:
auth = {"aws_access_key_id":"id", "aws_secret_access_key":"key"}
connection = kinesis.connect_to_region('us-east-1',**auth)
Шаг 2: получите информацию о потоке (например, сколько обрывов, если оно активно..)
tries = 0
while tries < 10:
tries += 1
time.sleep(1)
try:
response = connection.describe_stream('stream_name')
if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
break
except :
logger.error('error while trying to describe kinesis stream : %s')
else:
raise TimeoutError('Stream is still not active, aborting...')
Шаг 3: получите все идентификаторы shard и для каждого общего идентификатора получите итератор осколков:
shard_ids = []
stream_name = None
if response and 'StreamDescription' in response:
stream_name = response['StreamDescription']['StreamName']
for shard_id in response['StreamDescription']['Shards']:
shard_id = shard_id['ShardId']
shard_iterator = connection.get_shard_iterator(stream_name, shard_id, shard_iterator_type)
shard_ids.append({'shard_id' : shard_id ,'shard_iterator' : shard_iterator['ShardIterator'] })
Шаг 4: прочитайте данные для каждого осколка
limit - это предел для записей, которые вы хотите получить. (вы можете получить до 10 МБ) shard_iterator является общим с предыдущего шага.
tries = 0
result = []
while tries < 100:
tries += 1
response = connection.get_records(shard_iterator = shard_iterator , limit = limit)
shard_iterator = response['NextShardIterator']
if len(response['Records'])> 0:
for res in response['Records']:
result.append(res['Data'])
return result , shard_iterator
в следующем вызове get_records, вы должны использовать shard_iterator, который вы получили с результатом предыдущих get_records.
note: в одном вызове get_records (limit = None) вы можете получать пустые записи. если вы вызываете get_records с лимитом, вы получите записи, которые находятся в одном ключе раздела (когда вы вставляете данные в поток, вам нужно использовать ключ раздела:
connection.put_record(stream_name, data, partition_key)