AWS Kinesis Consumer Python 3.4 Boto

I am trying to create a consumer-kinesit script using the below python 3.4 code. I want the entries to be saved to a local file, which I can later push to S3:

from boto import kinesis
import time
import json

# AWS Connection Credentials
aws_access_key = 'your_key'
aws_access_secret = 'your_secret key'

# Selected Kinesis Stream
stream = 'TwitterTesting'

# Aws Authentication
auth = {"aws_access_key_id": aws_access_key, "aws_secret_access_key": aws_access_secret}
conn = kinesis.connect_to_region('us-east-1',**auth)

# Targeted file to be pushed to S3 bucket
fileName = "KinesisDataTest2.txt"
file = open("C:\\Users\\csanders\\PycharmProjects\\untitled\\KinesisDataTest.txt", "a")

# Describe stream and get shards
tries = 0
while tries < 10:
    tries += 1
    response = conn.describe_stream(stream)
    if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
    raise TimeoutError('Stream is still not active, aborting...')

# Get Shard Iterator and get records from stream
shard_ids = []
stream_name = None
if response and 'StreamDescription' in response:
    stream_name = response['StreamDescription']['StreamName']
    for shard_id in response['StreamDescription']['Shards']:
        shard_id = shard_id['ShardId']
        shard_iterator = conn.get_shard_iterator(stream,
        shard_id, 'TRIM_HORIZON')
        shard_ids.append({'shard_id': shard_id, 'shard_iterator': shard_iterator['ShardIterator']})
        tries = 0
        result = []
        while tries < 100:
            tries += 1
            response = conn.get_records(shard_iterator, 100)
            shard_iterator = response['NextShardIterator']
            if len(response['Records'])> 0:
                for res in response['Records']:
                    print(result, shard_iterator)


For some reason, when I run this script, I get the following error every time:

Traceback (most recent call last):
  File "C:/Users/csanders/PycharmProjects/untitled/",  line 57, in <module>
    response = json.load(conn.get_records(shard_ids, 100))
  File "C:\Python34\lib\site-packages\boto-2.38.0-py3.4.egg\boto\kinesis\", line 327, in get_records
  File "C:\Python34\lib\site-packages\boto-2.38.0- py3.4.egg\boto\kinesis\", line 874, in make_request
boto.exception.JSONResponseError: JSONResponseError: 400 Bad Request
{'Message': 'Start of list found where not expected', '__type':   'SerializationException'}


My ultimate goal is to eventually dump this data into an S3 bucket. I just need to get these records back and be the first to print. The data coming into the stream is JSON data dump twitter using a function put_record

. I can also post this code if needed.

Updated that one line from response = json.load (conn.get_records (shard_ids, 100)) in response = conn.get_records (shard_iterator, 100)


source to share

2 answers

response = json.load(conn.get_records(shard_ids, 100))


get_records expects shard_id to not be an array of shards. when it tries to get records it fails (you can see 400 from Kinesis say the request is bad).



if you replace the following will work ("as long as" you configured according to how many records you would like to collect, you can do infinite "with == 0" and remove "try + = 1")

    shard_iterator = conn.get_shard_iterator(stream,
    shard_id, 'TRIM_HORIZON')
    shard_ids.append({'shard_id': shard_id, 'shard_iterator': shard_iterator['ShardIterator']})


with the following:

    shard_iterator = conn.get_shard_iterator(stream,
    shard_id, "LATEST")["ShardIterator"]


also write a file to the file ("\ n" for a new line):

print(result, shard_iterator)



file.write(str(result) + "\n")


Hope it helps.



All Articles