KinesisStream

The KinesisStream construct is a higher level CDK construct that makes it easy to create a Kinesis Data Stream. You can create a stream and add a list of consumers to it.

This construct makes it easy to define a stream and its consumers. It also internally connects the consumers and the stream together.

Initializer#

new KinesisStream(scope: Construct, id: string, props: KinesisStreamProps)

Parameters

Examples#

Using the minimal config#

import { KinesisStream } from "@serverless-stack/resources";
new KinesisStream(this, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});

Adding consumers#

Add consumers after the stream has been created.

const stream = new KinesisStream(this, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});
stream.addConsumers(this, {
consumer3: "src/consumer3.main",
});

Lazily adding consumers#

Create an empty stream and lazily add the consumers.

const stream = new KinesisStream(this, "Stream");
stream.addConsumers(this, {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
});

Specifying function props for all the consumers#

You can extend the minimal config, to set some function props and have them apply to all the consumers.

new KinesisStream(this, "Stream", {
defaultFunctionProps: {
timeout: 20,
environment: { tableName: table.tableName },
permissions: [table],
},
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});

Using the full config#

If you wanted to configure each Lambda function separately, you can pass in the KinesisStreamConsumerProps.

new KinesisStream(this, "Stream", {
consumers: {
consumer1: {
function: {
handler: "src/consumer1.main",
timeout: 10,
environment: { tableName: table.tableName },
permissions: [table],
},
}
},
});

Note that, you can set the defaultFunctionProps while using the function per consumer. The function will just override the defaultFunctionProps. Except for the environment, the layers, and the permissions properties, that will be merged.

new KinesisStream(this, "Stream", {
defaultFunctionProps: {
timeout: 20,
environment: { tableName: table.tableName },
permissions: [table],
},
consumers: {
consumer1: {
function: {
handler: "src/consumer1.main",
timeout: 10,
environment: { bucketName: bucket.bucketName },
permissions: [bucket],
},
},
consumer2: "src/consumer2.main",
},
});

So in the above example, the consumer1 function doesn't use the timeout that is set in the defaultFunctionProps. It'll instead use the one that is defined in the function definition (10 seconds). And the function will have both the tableName and the bucketName environment variables set; as well as permissions to both the table and the bucket.

Giving the consumers some permissions#

Allow the consumer functions to access S3.

const stream = new KinesisStream(this, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});
stream.attachPermissions(["s3"]);

Giving a specific consumers some permissions#

Allow a specific consumer function to access S3.

const stream = new KinesisStream(this, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});
stream.attachPermissionsToConsumer("consumer1", ["s3"]);

Configuring the Kinesis stream#

Configure the internally created CDK Stream instance.

new KinesisStream(this, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
kinesisStream: {
shardCount: 3,
},
});

Configuring a consumer#

Configure the internally created CDK Event Source.

import { StartingPosition } from "@aws-cdk/aws-lambda";
new KinesisStream(this, "Stream", {
consumers: {
consumer1: {
function: "src/consumer1.main",
consumerProps: {
startingPosition: StartingPosition.LATEST,
},
},
}
});

Importing an existing stream#

Override the internally created CDK Stream instance.

import { Stream } from "@aws-cdk/aws-kinesis";
new KinesisStream(this, "Stream", {
kinesisStream: Stream.fromStreamArn(this, "ImportedStream", streamArn),
});

Properties#

An instance of KinesisStream contains the following properties.

streamArn#

Type: string

The ARN of the internally created CDK Stream instance.

streamName#

Type: string

The name of the internally created CDK Stream instance.

kinesisStream#

Type : cdk.aws-kinesis.Stream

The internally created CDK Stream instance.

Methods#

An instance of KinesisStream contains the following methods.

getFunction#

getFunction(consumerName: string): Function

Parameters

  • consumerName string

Returns

Get the instance of the internally created Function, for a given consumer. Where the consumerName is the name used to define a consumer.

addConsumers#

addConsumers(scope: cdk.Construct, consumers: { [consumerName: string]: FunctionDefinition | KinesisStreamConsumerProps })

Parameters

  • scope cdk.Construct
  • consumers { [consumerName: string]: FunctionDefinition | KinesisStreamConsumerProps }

An associative array with the consumer name being a string and the value is either a FunctionDefinition or the KinesisStreamConsumerProps.

attachPermissions#

attachPermissions(permissions: Permissions)

Parameters

Attaches the given list of permissions to all the consumer functions. This allows the consumers to access other AWS resources.

Internally calls Function.attachPermissions.

attachPermissionsToConsumer#

attachPermissionsToConsumer(consumerName: string, permissions: Permissions)

Parameters

Attaches the given list of permissions to a specific consumer. This allows that function to access other AWS resources.

Internally calls Function.attachPermissions.

KinesisStreamProps#

consumers?#

Type : { [consumerName: string]: FunctionDefinition | KinesisStreamConsumerProps }, defaults to {}

The consumers for this stream. Takes an associative array, with the consumer name being a string and the value is either a FunctionDefinition or the KinesisStreamConsumerProps.

caution

You should not change the name of a consumer.

Note, if the consumerName is changed, CloudFormation will remove the existing consumer and create a new one. If the starting point is set to TRIM_HORIZON, all the historical records available in the stream will be resent to the new consumer.

kinesisStream?#

Type : cdk.aws-kinesis.Stream | cdk.aws-kinesis.StreamProps, defaults to undefined

Or optionally pass in a CDK cdk.aws-kinesis.StreamProps instance or a cdk.aws-kinesis.Stream instance. This allows you to override the default settings this construct uses internally to create the stream.

defaultFunctionProps?#

Type : FunctionProps, defaults to {}

The default function props to be applied to all the Lambda functions in the Stream. If the function is specified for a consumer, these default values are overridden. Except for the environment, the layers, and the permissions properties, that will be merged.

KinesisStreamConsumerProps#

function#

Type : FunctionDefinition

A FunctionDefinition object that'll be used to create the consumer function for the stream.

consumerProps?#

Type : cdk.aws-lambda-event-sources.lambdaEventSources.KinesisEventSourceProps, defaults to KinesisEventSourceProps with starting point set to LATEST.

Or optionally pass in a CDK KinesisEventSourceProps. This allows you to override the default settings this construct uses internally to create the consumer.