AWS Kinesis Consumer Python 3.4 Boto

I am trying to create a consumer-kinesit script using the below python 3.4 code. I want the entries to be saved to a local file, which I can later push to S3:

from boto import kinesis
import time
import json

# AWS Connection Credentials
aws_access_key = 'your_key'
aws_access_secret = 'your_secret key'

# Selected Kinesis Stream
stream = 'TwitterTesting'

# Aws Authentication
auth = {"aws_access_key_id": aws_access_key, "aws_secret_access_key": aws_access_secret}
conn = kinesis.connect_to_region('us-east-1',**auth)

# Targeted file to be pushed to S3 bucket
fileName = "KinesisDataTest2.txt"
file = open("C:\\Users\\csanders\\PycharmProjects\\untitled\\KinesisDataTest.txt", "a")

# Describe stream and get shards
tries = 0
while tries < 10:
    tries += 1
    time.sleep(1)
    response = conn.describe_stream(stream)
    if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
        break
else:
    raise TimeoutError('Stream is still not active, aborting...')

# Get Shard Iterator and get records from stream
shard_ids = []
stream_name = None
if response and 'StreamDescription' in response:
    stream_name = response['StreamDescription']['StreamName']
    for shard_id in response['StreamDescription']['Shards']:
        shard_id = shard_id['ShardId']
        shard_iterator = conn.get_shard_iterator(stream,
        shard_id, 'TRIM_HORIZON')
        shard_ids.append({'shard_id': shard_id, 'shard_iterator': shard_iterator['ShardIterator']})
        tries = 0
        result = []
        while tries < 100:
            tries += 1
            response = conn.get_records(shard_iterator, 100)
            shard_iterator = response['NextShardIterator']
            if len(response['Records'])> 0:
                for res in response['Records']:
                    result.append(res['Data'])
                    print(result, shard_iterator)

      

For some reason, when I run this script, I get the following error every time:

Traceback (most recent call last):
  File "C:/Users/csanders/PycharmProjects/untitled/Get_records_Kinesis.py",  line 57, in <module>
    response = json.load(conn.get_records(shard_ids, 100))
  File "C:\Python34\lib\site-packages\boto-2.38.0-py3.4.egg\boto\kinesis\layer1.py", line 327, in get_records
    body=json.dumps(params))
  File "C:\Python34\lib\site-packages\boto-2.38.0- py3.4.egg\boto\kinesis\layer1.py", line 874, in make_request
    body=json_body)
boto.exception.JSONResponseError: JSONResponseError: 400 Bad Request
{'Message': 'Start of list found where not expected', '__type':   'SerializationException'}

      

My ultimate goal is to eventually dump this data into an S3 bucket. I just need to get these records back and be the first to print. The data coming into the stream is JSON data dump twitter using a function put_record

. I can also post this code if needed.

Updated that one line from response = json.load (conn.get_records (shard_ids, 100)) in response = conn.get_records (shard_iterator, 100)

+3


source to share


2 answers


response = json.load(conn.get_records(shard_ids, 100))

      

get_records expects shard_id to not be an array of shards. when it tries to get records it fails (you can see 400 from Kinesis say the request is bad).



http://boto.readthedocs.org/en/latest/ref/kinesis.html?highlight=get_records#boto.kinesis.layer1.KinesisConnection.get_records

0


source


if you replace the following will work ("as long as" you configured according to how many records you would like to collect, you can do infinite "with == 0" and remove "try + = 1")

    shard_iterator = conn.get_shard_iterator(stream,
    shard_id, 'TRIM_HORIZON')
    shard_ids.append({'shard_id': shard_id, 'shard_iterator': shard_iterator['ShardIterator']})

      

with the following:

    shard_iterator = conn.get_shard_iterator(stream,
    shard_id, "LATEST")["ShardIterator"]

      

also write a file to the file ("\ n" for a new line):



print(result, shard_iterator)

      

in

file.write(str(result) + "\n")

      

Hope it helps.

0


source







All Articles