Skip to main content

KinesisStream

caution

This is the SST v0.x Constructs doc. SST v1 is now released. If you are using v1, see the v1 Constructs doc. If you are looking to upgrade to v1, check out the migration steps.

The KinesisStream construct is a higher level CDK construct that makes it easy to create a Kinesis Data Stream. You can create a stream and add a list of consumers to it.

This construct makes it easy to define a stream and its consumers. It also internally connects the consumers and the stream together.

Initializer

new KinesisStream(scope: Construct, id: string, props: KinesisStreamProps)

Parameters

Examples

Using the minimal config

import { KinesisStream } from "@serverless-stack/resources";

new KinesisStream(this, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});

Adding consumers

Add consumers after the stream has been created.

const stream = new KinesisStream(this, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});

stream.addConsumers(this, {
consumer3: "src/consumer3.main",
});

Lazily adding consumers

Create an empty stream and lazily add the consumers.

const stream = new KinesisStream(this, "Stream");

stream.addConsumers(this, {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
});

Specifying function props for all the consumers

You can extend the minimal config, to set some function props and have them apply to all the consumers.

new KinesisStream(this, "Stream", {
defaultFunctionProps: {
timeout: 20,
environment: { tableName: table.tableName },
permissions: [table],
},
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});

Using the full config

If you wanted to configure each Lambda function separately, you can pass in the KinesisStreamConsumerProps.

new KinesisStream(this, "Stream", {
consumers: {
consumer1: {
function: {
handler: "src/consumer1.main",
timeout: 10,
environment: { tableName: table.tableName },
permissions: [table],
},
}
},
});

Note that, you can set the defaultFunctionProps while using the function per consumer. The function will just override the defaultFunctionProps. Except for the environment, the layers, and the permissions properties, that will be merged.

new KinesisStream(this, "Stream", {
defaultFunctionProps: {
timeout: 20,
environment: { tableName: table.tableName },
permissions: [table],
},
consumers: {
consumer1: {
function: {
handler: "src/consumer1.main",
timeout: 10,
environment: { bucketName: bucket.bucketName },
permissions: [bucket],
},
},
consumer2: "src/consumer2.main",
},
});

So in the above example, the consumer1 function doesn't use the timeout that is set in the defaultFunctionProps. It'll instead use the one that is defined in the function definition (10 seconds). And the function will have both the tableName and the bucketName environment variables set; as well as permissions to both the table and the bucket.

Giving the consumers some permissions

Allow the consumer functions to access S3.

const stream = new KinesisStream(this, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});

stream.attachPermissions(["s3"]);

Giving a specific consumers some permissions

Allow a specific consumer function to access S3.

const stream = new KinesisStream(this, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});

stream.attachPermissionsToConsumer("consumer1", ["s3"]);

Configuring the Kinesis stream

Configure the internally created CDK Stream instance.

new KinesisStream(this, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
kinesisStream: {
shardCount: 3,
},
});

Configuring a consumer

Configure the internally created CDK Event Source.

import { StartingPosition } from "aws-cdk-lib/aws-lambda";

new KinesisStream(this, "Stream", {
consumers: {
consumer1: {
function: "src/consumer1.main",
consumerProps: {
startingPosition: StartingPosition.LATEST,
},
},
}
});

Importing an existing stream

Override the internally created CDK Stream instance.

import { Stream } from "aws-cdk-lib/aws-kinesis";

new KinesisStream(this, "Stream", {
kinesisStream: Stream.fromStreamArn(this, "ImportedStream", streamArn),
});

Properties

An instance of KinesisStream contains the following properties.

streamArn

Type: string

The ARN of the internally created CDK Stream instance.

streamName

Type: string

The name of the internally created CDK Stream instance.

kinesisStream

Type : cdk.aws-kinesis.Stream

The internally created CDK Stream instance.

Methods

An instance of KinesisStream contains the following methods.

getFunction

getFunction(consumerName: string): Function

Parameters

  • consumerName string

Returns

Get the instance of the internally created Function, for a given consumer. Where the consumerName is the name used to define a consumer.

addConsumers

addConsumers(scope: cdk.Construct, consumers: { [consumerName: string]: FunctionDefinition | KinesisStreamConsumerProps })

Parameters

  • scope cdk.Construct
  • consumers { [consumerName: string]: FunctionDefinition | KinesisStreamConsumerProps }

An associative array with the consumer name being a string and the value is either a FunctionDefinition or the KinesisStreamConsumerProps.

attachPermissions

attachPermissions(permissions: Permissions)

Parameters

Attaches the given list of permissions to all the consumer functions. This allows the consumers to access other AWS resources.

Internally calls Function.attachPermissions.

attachPermissionsToConsumer

attachPermissionsToConsumer(consumerName: string, permissions: Permissions)

Parameters

Attaches the given list of permissions to a specific consumer. This allows that function to access other AWS resources.

Internally calls Function.attachPermissions.

KinesisStreamProps

consumers?

Type : { [consumerName: string]: FunctionDefinition | KinesisStreamConsumerProps }, defaults to {}

The consumers for this stream. Takes an associative array, with the consumer name being a string and the value is either a FunctionDefinition or the KinesisStreamConsumerProps.

caution

You should not change the name of a consumer.

Note, if the consumerName is changed, CloudFormation will remove the existing consumer and create a new one. If the starting point is set to TRIM_HORIZON, all the historical records available in the stream will be resent to the new consumer.

kinesisStream?

Type : cdk.aws-kinesis.Stream | cdk.aws-kinesis.StreamProps, defaults to undefined

Or optionally pass in a CDK cdk.aws-kinesis.StreamProps instance or a cdk.aws-kinesis.Stream instance. This allows you to override the default settings this construct uses internally to create the stream.

defaultFunctionProps?

Type : FunctionProps, defaults to {}

The default function props to be applied to all the Lambda functions in the Stream. If the function is specified for a consumer, these default values are overridden. Except for the environment, the layers, and the permissions properties, that will be merged.

KinesisStreamConsumerProps

function

Type : FunctionDefinition

A FunctionDefinition object that'll be used to create the consumer function for the stream.

consumerProps?

Type : cdk.aws-lambda-event-sources.lambdaEventSources.KinesisEventSourceProps, defaults to KinesisEventSourceProps with starting point set to LATEST.

Or optionally pass in a CDK KinesisEventSourceProps. This allows you to override the default settings this construct uses internally to create the consumer.