Stream data to AWS S3 as a CSV file with Axios

Stream JSON response to AWS S3 as a CSV file with Axios

I’m assuming that, like me, you ran into a challenge where you needed to stream a large JSON response (not a file) without having to store the entire JSON array in memory before processing and uploading it as a document.

Streaming a JSON response with Axios returns the chunks of data as substrings of the entire JSON response, an approach would be to append all the chunks into a single string and run JSON.parse on the final string.

let data = '';
const response = await axios.get(url, { responseType: 'stream' });
response.data.on('data', (chunk) => {
    data += chunk;
});
const parsedData = JSON.parse(data);

This approach certainly works but depending on the size of the records this approach is not memory efficient, for over a hundred thousand records you could be seeing a memory usage of over 1GB. Having multiple users making concurrent requests with each request utilizing over 1GB is definitely not an optimal use of system resources.

In order to convert the chunks of data into a format that can be uploaded to CSV, you’d need to parse the data from the text format to a compatible. This can be achieved by making use of AsyncParser from the @json2csv/node npm package.

The steps below highlight how to go about it

Step 1

Install the @json2csv/node package

npm i --save @json2csv/node

Step 2

Define your parser

const opts = {
  fields: ['header1', 'header2', 'header3']
};
const transformOpts = {};
const asyncOpts = {};
const parser = new AsyncParser(opts, asyncOpts, transformOpts);

for further configuration options, you can check out the doc

Step 3

Make use of your parser to parse your stream data and generate a PassThrough stream

function generateStream(url) {
  // Create a PassThrough stream to pipe data through
  const passThroughStream = new PassThrough();

  // Stream JSON data from the URL using Axios 
  axios.get(url, { responseType: 'stream' }).then((response) => {
    // Pipe the parsed response stream to the PassThrough stream
    parser.parse(response.data).pipe(passThroughStream);
  });
  return passThroughStream;
}

The generateStream function makes an API call with a { responseType: 'stream' } to indicate to Axios that the data should be returned as a continuous stream instead of a single chunk. Using a stream is memory efficient as opposed to loading all the records in memory at once, you only load a chunk into memory and process it.

Step 4

Upload your data to s3

const { S3Client } = require('@aws-sdk/client-s3');

const s3Client = new S3Client({
  region: 'region',
  credentials: {
    accessKeyId: 'access_key',
    secretAccessKey: 'secret_key',
  },
});
async function streamJsonToS3(url, s3Bucket, s3Key) {
  try {

    const passThroughStream = generateStream(url)


    // Create an S3 upload command
    const uploadParams = {
      Bucket: s3Bucket,
      Key: s3Key,
      Body: passThroughStream,
      ContentType: 'application/csv', // Set the content type to CSV
    };

    // Upload the data to S3
    const command = new Upload({
      client: s3Client,
      params: uploadParams
    });

     await command.done();


    console.log(`Successfully uploaded JSON data to S3 as CSV at ${s3Bucket}/${s3Key}`);
  } catch (error) {
    console.error('Error streaming and uploading data:', error);
  }
}

The Passthrough stream generated is then passed on to S3 for upload.

Conclusion

By streaming data to AWS S3 as a CSV file using Axios, you can significantly enhance the performance and efficiency of your application. This method allows for handling large datasets without overwhelming memory resources, as data is processed in manageable chunks. Utilizing AsyncParser from the @json2csv/node package ensures that the data is accurately parsed and formatted, while the use of a PassThrough stream facilitates seamless data transfer to S3. Overall, this approach optimizes resource usage and maintains consistent performance regardless of the volume of data being processed.