-- Leo's gemini proxy
-- Connecting to capsule.adrianhesketh.com:1965...
-- Connected
-- Sending request
-- Meta line: 20 text/gemini; charset=utf-8
If you've got a file processing job that might take more than the 15 minutes of time you have with AWS Lambda, or you want to use more then 10GB RAM, you'll need to consider an alternative way to run it.
While it's possible to break S3 files up into chunks and process them in parallel using Step Functions [0] [1], sometimes you don't want the jobs processed as quickly as possible, or want to take on the developer complexity of learning Step Functions.
As with all things AWS, there's a lot of options.
It's possible to start an EC2 instance to run the job, but that requires either building an operating system image containing the program I want to run, or building a user data script that downloads the program to the instance on startup.
Both of these strategies have problems - it's quite slow to build EC2 images, and it makes startup time slow if the user data configures the server each time it starts up. In addition, I'd have to handle pushing log data to CloudWatch myself, by installing and configuring the CloudWatch Logs Agent.
AWS Batch is an AWS service that's focussed on this type of job, but it seems designed around the concept of having worker nodes available to run tasks. There's a fairly complicated setup required to get going if you want to set it up using infrastructure as code techniques.
I much prefer to run a Docker container in Fargate because Docker makes it easy to test locally, and is quick to make changes to, while Fargate is a way of running Docker containers without worrying about the underlying operating system.
The overall architecture of this solution requires:
Creating a Dockerfile containing the task to run.
Building the Docker file and uploading it to a Docker registry (ECR).
Creating a Fargate task definition to instruct it to run the Docker image.
Creating an S3 bucket to store files in.
Creating a Lambda function that's triggered when a new file is added to S3, and runs the Fargate Task Definition.
┌──────────────┐ │ Dockerfile │ └───────┬──────┘ │ ▼ ┌─────────────────────┐ │ Container registry │ └──────────┬──────────┘ │ ▼ ┌────────┐ ┌──────┐ ┌──────────┐ ┌────────────────┐ │ File ├────►│ S3 ├────►│ Lambda ├───►│ Fargate task │ └────────┘ └──────┘ └──────────┘ └────────┬───────┘ ▲ │ └──────────────────────────────────┘ Read
/task - Contains the program code, and the Dockerfile for the processing task.
/taskrunner - Contains the program code for the Lambda function that will start the processing task.
/cdk - Contains all of the code to create the infrastructure.
For the example, I created a "Hello, World" Go program. Outside of this example, this program might want to downloads the uploaded file from S3 using the Go AWS SDK and processes it.
package main import "fmt" func main() { fmt.Println("OK, I ran.") }
To build the program, I configured a multi-stage Dockerfile.
The file contains a "build container" based on `golang:1.17` which runs `go build`. This outputs an executable called `app`.
The file also contains a "runtime container" based on `ubuntu:latest` which uses the `app` from the build container (`COPY --from=0 /src/app ./`) and runs it.
Later, we'll see that CDK can use this Dockerfile to build and upload the container to ECR.
FROM golang:1.17 WORKDIR /src/ COPY . /src/ RUN go build -o app FROM ubuntu:latest RUN apt-get update && apt-get install -y ca-certificates WORKDIR /run/ COPY --from=0 /src/app ./ CMD [ "./app" ]
The task runner Lambda function is responsible for starting the Fargate task. The API call to start a Fargate task requires a number of parameters, some of which the Lambda function handler code expects to be found in environment variables.
The required parameters include the ID of the cluster, the container to run, the definition of the task, and the VPC subnets that the task should run within.
type config struct { ClusterARN string ContainerName string TaskDefinitionARN string Subnets []string S3Bucket string IsValid bool }
The Lambda function handler is configured to expect to receive S3 events, and calls the ECS `RunTask` API to start the Fargate task for each S3 event it receives.
I've configured the task to run in a public subnet so that I don't end up paying for NAT gateways in my VPC, but your requirements here might be different.
func handler(ctx context.Context, s3Event events.S3Event) (err error) { logger.Info("Starting...") svc := ecs.New(session.New()) for i, record := range s3Event.Records { s3 := record.S3 logger.Info("Processing File", zap.Int("index", i), zap.Int("count", len(s3Event.Records)), zap.String("bucketName", s3.Bucket.Name), zap.String("objectKey", s3.Object.Key)) input := &ecs.RunTaskInput{ Cluster: &c.ClusterARN, TaskDefinition: &c.TaskDefinitionARN, NetworkConfiguration: &ecs.NetworkConfiguration{ AwsvpcConfiguration: &ecs.AwsVpcConfiguration{ // Set to true if in the public subnet so that the container can be downloaded. AssignPublicIp: aws.String(ecs.AssignPublicIpEnabled), Subnets: aws.StringSlice(c.Subnets), }, }, Overrides: &ecs.TaskOverride{ ContainerOverrides: []*ecs.ContainerOverride{{ Name: &c.ContainerName, Environment: []*ecs.KeyValuePair{ {Name: aws.String("S3_KEY"), Value: &s3.Object.Key}, {Name: aws.String("S3_BUCKET"), Value: &c.S3Bucket}, }, }}, }, LaunchType: aws.String(ecs.LaunchTypeFargate), } res, err := svc.RunTask(input) if err != nil { logger.Error("Failed to run task", zap.Error(err)) return err } for _, task := range res.Tasks { logger.Info("Started task", zap.String("taskId", *task.TaskArn)) } } return }
With the task code, and the task starter code in place, the components can be connected with CDK.
The bucket that will receive files will later be configured to trigger the Lambda function to fire, but for now, it's just a basic bucket.
sourceBucket := awss3.NewBucket(stack, jsii.String("sourceBucket"), &awss3.BucketProps{}) sourceBucket.DisallowPublicAccess()
ECS tasks need to be ran inside a VPC, so one needs to be created.
I've chosen to run everything in the public subnets to save on costs, because if you run in private subnets, you'll need to use NAT gateways to access the Internet, or configure a VPC endpoint to enable access to download the ECR images.
vpc := awsec2.NewVpc(stack, jsii.String("taskVpc"), &awsec2.VpcProps{ // If you're setting up NAT gateways, you might want to drop to 2 to save a few pounds. MaxAzs: jsii.Number(2), // If NatGateways are available, we can host in any subnet. // But they're a waste of money for this. // I'll host them in the public subnet instead. NatGateways: jsii.Number(0), })
The ECS cluster itself has a required parameter for the VPC it will run tasks within. There's no servers in the cluster, since it's going to run on-demand Fargate tasks which shut themselves down after completion.
cluster := awsecs.NewCluster(stack, jsii.String("ecsCluster"), &awsecs.ClusterProps{ Vpc: vpc, })
Two IAM roles are required to be able to create an ECS task.
The first role required to create an ECS task is a task execution role (`ter`). This is used to start the task, and needs to have permission to download the containers from ECR, and write to logs.
There's a managed task execution role that could be used, but the CDK type doesn't have the handy `GrantPassRole` helper on it, so I recreated it using the documentation at [2] instead.
ter := awsiam.NewRole(stack, jsii.String("taskExecutionRole"), &awsiam.RoleProps{ AssumedBy: awsiam.NewServicePrincipal(jsii.String("ecs-tasks.amazonaws.com"), &awsiam.ServicePrincipalOpts{}), }) ter.AddToPolicy(awsiam.NewPolicyStatement(&awsiam.PolicyStatementProps{ Actions: jsii.Strings("ecr:BatchCheckLayerAvailability", "ecr:GetDownloadUrlForLayer", "ecr:BatchGetImage", "logs:CreateLogStream", "logs:PutLogEvents", "ecr:GetAuthorizationToken"), Resources: jsii.Strings("*"), }))
The second role required to create an ECS task is the task role (`tr`). This role is used by the container when it's executing to access AWS resources.
The role needs permission to allow `stdout` and `stderr` to be written to CloudWatch Logs, and I've also given it permission to read from the source bucket with `sourceBucket.GrantRead(tr, nil)` so that the task can read the file that triggered the event.
tr := awsiam.NewRole(stack, jsii.String("taskRole"), &awsiam.RoleProps{ AssumedBy: awsiam.NewServicePrincipal(jsii.String("ecs-tasks.amazonaws.com"), &awsiam.ServicePrincipalOpts{}), }) tr.AddToPolicy(awsiam.NewPolicyStatement(&awsiam.PolicyStatementProps{ Actions: jsii.Strings("logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"), Resources: jsii.Strings("*"), })) sourceBucket.GrantRead(tr, nil)
With that out of the way, it's possible to define the task. The first part defines the roles that are used, and how much RAM and CPU the task is allocated. There are limited options for the RAM and CPU values, so you'll need to refer to the documentation at [3] to choose valid values.
td := awsecs.NewFargateTaskDefinition(stack, jsii.String("taskDefinition"), &awsecs.FargateTaskDefinitionProps{ MemoryLimitMiB: jsii.Number(512), Cpu: jsii.Number(256), ExecutionRole: ter, TaskRole: tr, })
The task definition needs to be given a Docker conatiner that it should run.
The `awsecs.AssetImage_FromAsset` function call in CDK builds the Dockerfile, creates an ECR repository in AWS, and uploads the build to it. So simple.
taskContainer := td.AddContainer(jsii.String("taskContainer"), &awsecs.ContainerDefinitionOptions{ Image: awsecs.AssetImage_FromAsset(jsii.String("../task"), &awsecs.AssetImageProps{}), Logging: awsecs.LogDriver_AwsLogs(&awsecs.AwsLogDriverProps{ StreamPrefix: jsii.String("task"), }), })
To start the ECS task when the file hits the S3 bucket, a Lambda function needs to be configured to run the Lambda handler code we defined earlier.
The Lambda function needs some permission to run the ECS task. There's a built-in policy for Lambda called `service-role/AWSLambdaBasicExecutionRole` which we can use as a base. This gives the Lambda function ability to write to CloudWatch Logs, for example.
taskStarterRole := awsiam.NewRole(stack, jsii.String("taskStarterRole"), &awsiam.RoleProps{ AssumedBy: awsiam.NewServicePrincipal(jsii.String("lambda.amazonaws.com"), &awsiam.ServicePrincipalOpts{}), }) taskStarterRole.AddManagedPolicy(awsiam.ManagedPolicy_FromAwsManagedPolicyName(jsii.String("service-role/AWSLambdaBasicExecutionRole")))
With the basic permissions in place, we can give the role permission to run the ECS task on the cluster.
taskStarterRole.AddToPolicy(awsiam.NewPolicyStatement(&awsiam.PolicyStatementProps{ Actions: jsii.Strings("ecs:RunTask"), Resources: jsii.Strings(*cluster.ClusterArn(), *td.TaskDefinitionArn()), }))
This is the tricky bit. The Lambda permission needs permission to run the task using the specific "task execution role" and "rask role" that we defined earlier. This is done by granting `PassRole` to the Lambda function role to those tasks.
td.ExecutionRole().GrantPassRole(taskStarterRole) td.TaskRole().GrantPassRole(taskStarterRole)
Now CDK can be used to build the `taskrunner` Lambda handler Go code, zip it up and create a Lambda function.
I've given the Lambda function 512MB of RAM, but it would happily run with 128MB because the function code is just making a single AWS API call to start the ECS task. It's the ECS task that will do all the hard work.
The Lambda handler code expects some environment variables containing configuration to be present, so these are populated by the CDK code.
taskStarter := awslambdago.NewGoFunction(stack, jsii.String("taskStarter"), &awslambdago.GoFunctionProps{ Runtime: awslambda.Runtime_GO_1_X(), Entry: jsii.String("../taskrunner"), Bundling: &awslambdago.BundlingOptions{ GoBuildFlags: &[]*string{jsii.String(`-ldflags "-s -w"`)}, }, Environment: &map[string]*string{ "CLUSTER_ARN": cluster.ClusterArn(), "CONTAINER_NAME": taskContainer.ContainerName(), "TASK_DEFINITION_ARN": td.TaskDefinitionArn(), "SUBNETS": jsii.String(strings.Join(*getSubnetIDs(vpc.PublicSubnets()), ",")), "S3_BUCKET": sourceBucket.BucketName(), }, MemorySize: jsii.Number(512), Role: taskStarterRole, Timeout: awscdk.Duration_Millis(jsii.Number(60000)), })
Finally, the Lambda function can be configured to fire when an object is added to the S3 bucket.
taskStarter.AddEventSource(awslambdaeventsources.NewS3EventSource(sourceBucket, &awslambdaeventsources.S3EventSourceProps{ Events: &[]awss3.EventType{ awss3.EventType_OBJECT_CREATED, }, }))
Lambda's time and RAM limitations mean that sometimes you might find it easier to use a Serverless container environment to run one-off processing jobs instead of using Lambda.
A good way of doing this is to use Fargate tasks, which can be used to spin up ephemeral containers and execute your code.
The code is available over at:
-- Response ended
-- Page fetched on Sun Apr 28 01:17:20 2024