Modern data pipelines are often a tangled mess of scripts, cron jobs, and monolithic applications that are brittle and difficult to scale. The dream is an automated, event-driven system that just works. The serverless paradigm, championed by services like AWS Lambda, gets us part of the way there. But what about the most complex part: the data transformation logic itself?
This is where you can hit a wall. Embedding complex rules for data mapping, cleansing, and format conversion directly into your Lambda functions creates tight coupling, making updates a chore and reuse nearly impossible.
In this guide, you'll learn how to build a powerful, event-driven, and infinitely scalable serverless data processing pipeline. We'll connect the dots between AWS S3, AWS Lambda, and transform.do to create a system where your infrastructure is lean and your transformation logic is clean, version-controlled, and managed as a simple service.
Before we dive into the code, let's look at the elegant architecture we're building.
This approach separates your concerns perfectly. AWS handles the event-driven infrastructure, while transform.do handles the specialized task of data transformation.
The heart of our pipeline is the transformation logic. By defining this on transform.do first, we create a stable API endpoint that our Lambda can call. This is the essence of "ETL as Code."
// Example transformation.do agent configuration
const transformations = {
targetFormat: "csv", // We want to convert from JSON to CSV
rules: [
// Rule 1: Rename fields to be more database-friendly
{ rename: { "user_id": "ID", "first_name": "FirstName", "last_name": "LastName", "join_date": "MemberSince" } },
// Rule 2: Convert the timestamp to a simple date format
{ convert: { "MemberSince": "date('YYYY-MM-DD')" } },
// Rule 3: Add a new, derived field for a full name
{ addField: { "FullName": "{{FirstName}} {{LastName}}" } },
// Rule 4: Ensure only specific columns are in the final output
{ select: ["ID", "FullName", "MemberSince"] }
]
};
Once you save this agent, transform.do provides you with a unique and secure API endpoint. Copy this endpoint URL and your API key—you'll need them for your Lambda function.
We need two S3 buckets to act as the start and end points of our pipeline.
Your Lambda function needs permission to interact with other AWS services. We'll create a specific role for it.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::my-app-raw-data-source/*"
},
{
"Effect": "Allow",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::my-app-processed-data-dest/*"
}
]
}
This is our orchestrator. The code is surprisingly simple because all the heavy lifting is offloaded to transform.do.
Now, replace the default index.mjs code with the following:
import { S3Client, GetObjectCommand, PutObjectCommand } from "@aws-sdk/client-s3";
import axios from "axios";
// Initialize S3 client
const s3Client = new S3Client({});
// Retrieve secrets from environment variables
const TRANSFORM_DO_ENDPOINT = process.env.TRANSFORM_DO_ENDPOINT;
const TRANSFORM_DO_API_KEY = process.env.TRANSFORM_DO_API_KEY;
const DESTINATION_BUCKET = process.env.DESTINATION_BUCKET;
// Helper to stream data from S3
const streamToString = (stream) =>
new Promise((resolve, reject) => {
const chunks = [];
stream.on("data", (chunk) => chunks.push(chunk));
stream.on("error", reject);
stream.on("end", () => resolve(Buffer.concat(chunks).toString("utf8")));
});
export const handler = async (event) => {
try {
// 1. Get the bucket and key from the trigger event
const sourceBucket = event.Records[0].s3.bucket.name;
const sourceKey = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
console.log(`New file detected: ${sourceKey} in bucket ${sourceBucket}`);
// 2. Read the raw file from the source S3 bucket
const getObjectParams = { Bucket: sourceBucket, Key: sourceKey };
const { Body: sourceStream } = await s3Client.send(new GetObjectCommand(getObjectParams));
const sourceDataString = await streamToString(sourceStream);
const sourceData = JSON.parse(sourceDataString);
// 3. Delegate the transformation to transform.do
console.log("Sending data to transform.do agent...");
const response = await axios.post(
TRANSFORM_DO_ENDPOINT,
{ source: sourceData }, // The payload must contain the source data
{ headers: {
'Authorization': `Bearer ${TRANSFORM_DO_API_KEY}`,
'Content-Type': 'application/json'
}
}
);
// transform.do returns the transformed data in the 'data' field
const transformedData = response.data.data;
console.log("Transformation successful. Received transformed data.");
// 4. Write the new, transformed data to the destination bucket
// The output will be a CSV string as defined in our agent
const destinationKey = sourceKey.replace('.json', '.csv');
const putObjectParams = {
Bucket: DESTINATION_BUCKET,
Key: destinationKey,
Body: transformedData,
ContentType: 'text/csv'
};
await s3Client.send(new PutObjectCommand(putObjectParams));
console.log(`Successfully processed and saved to ${destinationKey} in ${DESTINATION_BUCKET}`);
return { statusCode: 200, body: 'Transformation complete.' };
} catch (error) {
console.error("Error during transformation pipeline:", error);
// For production, add more robust error handling/notifications
throw error;
}
};
Before you can use this, you need to add axios. Create a package.json file on your local machine with { "dependencies": { "axios": "^1.6.0" } }, run npm install, then zip the resulting node_modules folder and your index.mjs file together. Upload this .zip file under the Code source section of your Lambda.
The final steps are to connect everything.
You're all set! To test the entire flow:
[
{ "user_id": 101, "first_name": "Jane", "last_name": "Doe", "join_date": "2023-01-15T10:00:00Z" },
{ "user_id": 102, "first_name": "John", "last_name": "Smith", "join_date": "2023-02-20T12:30:00Z" }
]
Download and open users.csv. Its content should be:
ID,FullName,MemberSince
101,"Jane Doe",2023-01-15
102,"John Smith",2023-02-20
It worked! You've successfully built a fully automated, serverless ETL pipeline.
By integrating transform.do with AWS Lambda, you've built more than just a pipeline; you've created a maintainable and scalable data processing foundation.
Ready to stop wrestling with ETL scripts and start building intelligent data workflows? Sign up for transform.do and turn your most complex data challenges into simple, powerful API calls.