Skip to main content

KinesisStream

The KinesisStream construct is a higher level CDK construct that makes it easy to create a Kinesis Data Stream. You can create a stream and add a list of consumers to it. This construct makes it easy to define a stream and its consumers. It also internally connects the consumers and the stream together.

Constructor

new KinesisStream(scope, id, props)

Parameters

Configuring consumers

Lazily adding consumers

Add consumers after the stream has been created.

const stream = new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});

stream.addConsumers(this, {
consumer3: "src/consumer3.main",
});

Specifying function props for all the consumers

You can extend the minimal config, to set some function props and have them apply to all the consumers.

new KinesisStream(stack, "Stream", {
defaults: {
function: {
timeout: 20,
environment: { tableName: table.tableName },
permissions: [table],
},
},
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});

Configuring an individual consumer

Configure each Lambda function separately.

new KinesisStream(stack, "Stream", {
consumers: {
consumer1: {
function: {
handler: "src/consumer1.main",
timeout: 10,
environment: { tableName: table.tableName },
permissions: [table],
},
}
},
});

Note that, you can set the defaults.function while using the function per consumer. The function will just override the defaults.function. Except for the environment, the layers, and the permissions properties, that will be merged.

new KinesisStream(stack, "Stream", {
defaults: {
function: {
timeout: 20,
environment: { tableName: table.tableName },
permissions: [table],
},
},
consumers: {
consumer1: {
function: {
handler: "src/consumer1.main",
timeout: 10,
environment: { bucketName: bucket.bucketName },
permissions: [bucket],
},
},
consumer2: "src/consumer2.main",
},
});

So in the above example, the consumer1 function doesn't use the timeout that is set in the defaults.function. It'll instead use the one that is defined in the function definition (10 seconds). And the function will have both the tableName and the bucketName environment variables set; as well as permissions to both the table and the bucket.

Configuring consumer event source

Configure the internally created CDK Event Source.

import { StartingPosition } from "aws-cdk-lib/aws-lambda";

new KinesisStream(stack, "Stream", {
consumers: {
consumer1: {
function: "src/consumer1.main",
cdk: {
eventSource: {
startingPosition: StartingPosition.LATEST,
},
},
},
}
});

Giving the consumers some permissions

Allow the consumer functions to access S3.

const stream = new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});

stream.attachPermissions(["s3"]);

Giving a specific consumers some permissions

Allow a specific consumer function to access S3.

const stream = new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
});

stream.attachPermissionsToConsumer("consumer1", ["s3"]);

Advanced examples

Configuring the Kinesis stream

Configure the internally created CDK Stream instance.

new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: "src/consumer2.main",
}
cdk: {
stream: {
shardCount: 3,
}
},
});

Importing an existing stream

Override the internally created CDK Stream instance.

import { Stream } from "aws-cdk-lib/aws-kinesis";

new KinesisStream(stack, "Stream", {
cdk: {
stream: Stream.fromStreamArn(this, "ImportedStream", streamArn),
},
});

KinesisStreamProps

consumers?

Type : Record<string, string | Function | KinesisStreamConsumerProps>

Define the function consumers for this stream

new KinesisStream(stack, "Stream", {
consumers: {
consumer1: "src/consumer1.main",
consumer2: {
function: {
handler: "src/consumer2.handler",
timeout: 30
}
}
}
});

defaults.function?

Type : FunctionProps

The default function props to be applied to all the Lambda functions in the API. The environment, permissions and layers properties will be merged with per route definitions if they are defined.

new KinesisStream(stack, "Stream", {
defaults: {
function: {
timeout: 20,
}
}
});

cdk.stream?

Type : IStream | StreamProps

Override the internally created Kinesis Stream

new KinesisStream(stack, "Stream", {
cdk: {
stream: {
streamName: "my-stream",
}
}
});

Properties

An instance of KinesisStream has the following properties.

streamArn

Type : string

The ARN of the internally created Kinesis Stream

streamName

Type : string

The name of the internally created Kinesis Stream

cdk.stream

Type : IStream

Return internally created Kinesis Stream

Methods

An instance of KinesisStream has the following methods.

addConsumers

addConsumers(scope, consumers)

Parameters

Add consumers to a stream after creating it

stream.addConsumers(stack, {
consumer1: "src/function.handler"
})

attachPermissions

attachPermissions(permissions)

Parameters

Attaches the given list of permissions to all the consumers. This allows the functions to access other AWS resources.

stream.attachPermissions(["s3"]);

attachPermissionsToConsumer

attachPermissionsToConsumer(consumerName, permissions)

Parameters

Attaches the given list of permissions to a specific consumer. This allows that function to access other AWS resources.

stream.attachPermissionsToConsumer("consumer1", ["s3"]);

getFunction

getFunction(consumerName)

Parameters

  • consumerName string

Get the function for a specific consumer

stream.getFunction("consumer1");

KinesisStreamConsumerProps

Used to define the function consumer for the stream

function

Type : string | Function | FunctionProps

The function definition

new KinesisStream(stack, "Stream", {
consumers: {
consumer1: {
function: {
handler: "src/consumer1.handler",
timeout: 30
}
}
}
});

cdk.eventSource?

Type : KinesisEventSourceProps

Override the interally created event source

new KinesisStream(stack, "Stream", {
consumers: {
fun: {
cdk: {
eventSource: {
enabled: false
}
}
}
}
});