source

s3.upload()에 스트림 파이프 연결

factcode 2023. 9. 1. 21:23
반응형

s3.upload()에 스트림 파이프 연결

저는 현재 s3-upload-stream이라는 node.js 플러그인을 사용하여 매우 큰 파일을 Amazon S3로 스트리밍하고 있습니다.그것은 멀티파트 API를 사용하며 대부분 매우 잘 작동합니다.

그러나 이 모듈은 사용 기간이 표시되어 있으며 이미 수정해야 했습니다(작성자도 이 모듈을 사용하지 않음).오늘 저는 아마존에서 다른 문제에 부딪혔는데, 저는 정말로 저자의 추천을 받아 공식적인 aws-sdk를 사용하여 업로드를 시작하고 싶습니다.

그렇지만.

공식 SDK는 다음을 위한 배관을 지원하지 않는 것 같습니다.s3.upload()s3.upload에 입니다.

다양한 파일 처리를 수행하는 약 120개 이상의 사용자 코드 모듈을 보유하고 있으며 출력의 최종 대상에 구애받지 않습니다.엔진은 파이프 가능한 쓰기 가능한 출력 스트림을 제공하고, 그들은 파이프로 연결합니다.나는 그들에게 줄 수 없습니다.AWS.S3이의를 제기하고 전화해 달라고 요청합니다.upload()모든 모듈에 코드를 추가하지 않고 사용할 수 있습니다.가 사한이를 사용한 는 용유유입니다.s3-upload-stream파이프를 지지했기 때문입니다.

요?s3.upload()내가 파이프로 연결할 수 있는 무언가?

S3를 감습니다.upload()와 합니다.stream.PassThrough()개울.

다음은 예입니다.

inputStream
  .pipe(uploadFromStream(s3));

function uploadFromStream(s3) {
  var pass = new stream.PassThrough();

  var params = {Bucket: BUCKET, Key: KEY, Body: pass};
  s3.upload(params, function(err, data) {
    console.log(err, data);
  });

  return pass;
}

답이 조금 늦으면 다른 사람에게 도움이 될 수도 있습니다.쓰기 가능한 스트림과 약속을 모두 반환하여 업로드가 완료되면 응답 데이터를 얻을 수 있습니다.

const AWS = require('aws-sdk');
const stream = require('stream');

const uploadStream = ({ Bucket, Key }) => {
  const s3 = new AWS.S3();
  const pass = new stream.PassThrough();
  return {
    writeStream: pass,
    promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
  };
}

그리고 다음과 같이 기능을 사용할 수 있습니다.

const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'});
const readStream = fs.createReadStream('/path/to/yourfile.mp4');

const pipeline = readStream.pipe(writeStream);

이제 약속을 확인할 수 있습니다.

promise.then(() => {
  console.log('upload completed successfully');
}).catch((err) => {
  console.log('upload failed.', err.message);
});

또는 비동기/대기 사용:

try {
    await promise;
    console.log('upload completed successfully');
} catch (error) {
    console.log('upload failed.', error.message);
}

또는 로서stream.pipe()스트림을 반환합니다.대상(위의 writeStream 변수)인 쓰기 가능(writeStream)을 사용하면 파이프 체인을 사용할 수도 있습니다.

 pipeline.on('close', () => {
   console.log('upload successful');
 });
 pipeline.on('error', (err) => {
   console.log('upload failed', err.message)
 });

승인된 답변에서는 업로드가 완료되기 전에 기능이 종료되므로 잘못된 것입니다.아래 코드는 판독 가능한 스트림에서 올바르게 파이프됩니다.

업로드 참조

async function uploadReadableStream(stream) {
  const params = {Bucket: bucket, Key: key, Body: stream};
  return s3.upload(params).promise();
}

async function upload() {
  const readable = getSomeReadableStream();
  const results = await uploadReadableStream(readable);
  console.log('upload complete', results);
}

더 다을사한단더계나진정행출수있습다도니력보할를음률아가용여하를 하여 진행률 정보를 .ManagedUpload이와 같이:

const manager = s3.upload(params);
manager.on('httpUploadProgress', (progress) => {
  console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});

관리 업로드 참조

사용 가능한 이벤트 목록

AWS SDK v3 :)에 대한 답변을 업데이트할 가치가 있다고 생각합니다.

S3가 .upload 더이 기하더 고능 상더 ▁function▁the 기고.@aws-sdk/lib-storage 대신 https://github.com/aws/aws-sdk-js-v3/blob/main/lib/lib-storage/README.md 에 따라 패키지가 제안됩니다.

따라서 결과 스니펫은 다음과 같습니다.

import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const stream = require('stream');

...

const client = new S3Client({
  credentials: {
    accessKeyId: process.env.AWS_ACCESS_KEY_ID,
    secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
  },
  region: process.env.AWS_DEFAULT_REGION,
});

...

async function uploadStream(readableStream) {

  const Key = 'filename.pdf'; 
  const Bucket = 'bucket-name';
  const passThroughStream = new stream.PassThrough();

  let res;

  try {
    const parallelUploads3 = new Upload({
      client,
      params: {
        Bucket,
        Key,
        Body: passThroughStream,
        ACL:'public-read',
      },
      queueSize: 4,
      partSize: 1024 * 1024 * 5,
      leavePartsOnError: false,
    });

    readableStream.pipe(passThroughStream);
    res = await parallelUploads3.done();
  } catch (e) {
    console.log(e);
  }

  return res;
}

저는 다음과 같은 것을 원했기 때문에 어떤 대답도 할 수 없었습니다.

  • 파이프를 연결s3.upload()
  • 의 결과를 파이프로 연결합니다.s3.upload()다른 시냇물로.

받아들여진 대답은 후자에 도움이 되지 않습니다.나머지는 스트림 파이프로 작업할 때 번거로운 Promise API에 의존합니다.

이것은 제가 수락한 답변을 수정한 것입니다.

const s3 = new S3();

function writeToS3({Key, Bucket}) {
  const Body = new stream.PassThrough();

  s3.upload({
    Body,
    Key,
    Bucket: process.env.adpBucket
  })
   .on('httpUploadProgress', progress => {
       console.log('progress', progress);
   })
   .send((err, data) => {
     if (err) {
       Body.destroy(err);
     } else {
       console.log(`File uploaded and available at ${data.Location}`);
       Body.destroy();
     }
  });

  return Body;
}

const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});

pipeline.on('close', () => {
  // upload finished, do something else
})
pipeline.on('error', () => {
  // upload wasn't successful. Handle it
})

스크립트 솔루션 유형:
이 예에서는 다음을 사용합니다.

import * as AWS from "aws-sdk";
import * as fsExtra from "fs-extra";
import * as zlib from "zlib";
import * as stream from "stream";

그리고 비동기 함수:

public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> { 

         const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => {
            const passT = new stream.PassThrough();
            return {
              writeStream: passT,
              promise: S3.upload({ Bucket, Key, Body: passT }).promise(),
            };
          };
        const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key);
        fsExtra.createReadStream(filePath).pipe(writeStream);     //  NOTE: Addition You can compress to zip by  .pipe(zlib.createGzip()).pipe(writeStream)
        let output = true;
        await promise.catch((reason)=> { output = false; console.log(reason);});
        return output;
}

이 메서드를 다음과 같은 위치에 호출합니다.

let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);

위의 가장 일반적인 답변에서 주목해야 할 점은 다음과 같습니다.다음과 같이 파이프를 사용하는 경우에는 기능에서 패스를 반환해야 합니다.

fs.createReadStream(<filePath>).pipe(anyUploadFunction())

function anyUploadFunction () { 
 let pass = new stream.PassThrough();
 return pass // <- Returning this pass is important for the stream to understand where it needs to write to.
}

그렇지 않으면 오류를 던지지 않고 자동으로 다음으로 이동하거나 다음 오류를 던집니다.TypeError: dest.on is not a function함수를 작성한 방법에 따라

다른 답변에 이어 Node.js용 최신 AWS SDK를 사용하면 s3 업로드() 함수가 wait 구문과 S3의 약속을 사용하여 스트림을 수신하므로 훨씬 더 깨끗하고 간단한 솔루션이 있습니다.

var model = await s3Client.upload({
    Bucket : bucket,
    Key : key,
    ContentType : yourContentType,
    Body : fs.createReadStream(path-to-file)
}).promise();

s3 api 업로드 기능과 0바이트 파일을 사용하면 s3(@Radar155, @gabo)에 도달한다고 불평하는 사람들을 위해, 저도 이런 문제가 있었습니다.

두 번째 패스 만들기스트림을 통과하고 첫 번째 데이터에서 두 번째 데이터로 모든 데이터를 파이프로 연결하고 두 번째 데이터에 대한 참조를 s3로 전달합니다.이 작업은 두 가지 방법으로 수행할 수 있습니다. 첫 번째 스트림에서 "데이터" 이벤트를 수신한 다음 두 번째 스트림에 동일한 데이터를 쓰는 것이 더티한 방법일 수 있습니다. 두 번째 스트림에서 "종료" 이벤트와 유사하게 종료 함수를 호출합니다.저는 이것이 aws api의 버그인지, 노드의 버전인지, 아니면 다른 문제인지는 전혀 모르겠습니다. 하지만 저는 이 문제를 해결할 수 있었습니다.

다음과 같이 표시됩니다.

var PassThroughStream = require('stream').PassThrough;
var srcStream = new PassThroughStream();

var rstream = fs.createReadStream('Learning/stocktest.json');
var sameStream = rstream.pipe(srcStream);
// interesting note: (srcStream == sameStream) at this point
var destStream = new PassThroughStream();
// call your s3.upload function here - passing in the destStream as the Body parameter
srcStream.on('data', function (chunk) {
    destStream.write(chunk);
});

srcStream.on('end', function () {
    dataStream.end();
});

클라이언트에서 s3로 성공적으로 스트리밍할 수 있는 사용자에게 도움이 되는 경우:

https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a

서버 측 코드는 다음과 같이 가정합니다.req스트림 개체입니다. 이 경우 클라이언트에서 헤더에 파일 정보가 설정된 상태로 전송되었습니다.

const fileUploadStream = (req, res) => {
  //get "body" args from header
  const { id, fn } = JSON.parse(req.get('body'));
  const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn
  const params = {
    Key,
    Bucket: bucketName, //set somewhere
    Body: req, //req is a stream
  };
  s3.upload(params, (err, data) => {
    if (err) {
      res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack));
    } else {
      res.send(Key);
    }
  });
};

네, 관례에 어긋나긴 하지만 요점을 살펴보면 멀터, 버스보이 등을 사용하여 찾은 다른 어떤 것보다 훨씬 깨끗합니다.

실용주의에 대한 +1 그리고 그의 도움에 대한 @Salehen Rahman에게 감사합니다.

AWS 노드 SDK v3를 사용하는 경우 스트림/블롭/버퍼를 업로드하기 위한 전용 모듈이 있습니다.

https://www.npmjs.com/package/ @aws-vlan/lib-storage

KnexJS를 사용하고 있는데 스트리밍 API 사용에 문제가 있었습니다.드디어 고쳤으니 다음 내용이 누군가에게 도움이 되길 바랍니다.

const knexStream = knex.select('*').from('my_table').stream();
const passThroughStream = new stream.PassThrough();

knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n'));
knexStream.on('end', () => passThroughStream.end());

const uploadResult = await s3
  .upload({
    Bucket: 'my-bucket',
    Key: 'stream-test.txt',
    Body: passThroughStream
  })
  .promise();

성을 합니다.new stream.PassThrough()그리고.pipe입력 스트림을 입력한 다음 패스스루 인스턴스를 본문으로 전달합니다.

다음 예를 확인합니다.

function upload(s3, inputStream) {
    const pass = new PassThrough();

    inputStream.pipe(pass);

    return s3.upload(
        {
            Bucket: 'bucket name',
            Key: 'unique file name',
            Body: pass,
        },
        {
            queueSize: 4, // default concurrency
        },
    ).promise()
        .then((data) => console.log(data))
        .catch((error) => console.error(error));
}

스트림의 크기를 알고 있는 경우 minio-js를 사용하여 다음과 같이 스트림을 업로드할 수 있습니다.

  s3Client.putObject('my-bucketname', 'my-objectname.ogg', stream, size, 'audio/ogg', function(e) {
    if (e) {
      return console.log(e)
    }
    console.log("Successfully uploaded the stream")
  })

언급URL : https://stackoverflow.com/questions/37336050/pipe-a-stream-to-s3-upload

반응형