Serverless Operations, inc

>_cd /blog/id_lqc4g0ofjok

title

Amazon OpenSearch ServerlessでBulk Import時に発生する429エラー(Too many requests)への対処

summary

Amazon OpenSearch Serverless及びAmazon OpenSearch Serviceで大量のデータをBulk Importすると429エラー(Too many requests)が発生する場合があります。見ての通り短時間で大量にデータをインポートする際に発生する可能性のあるエラーです。Amazon OpenSearch Serviceであれば、一時的にインスタンスサイズを上げるなど、AWS側での対応が可能になりますが、Amazon OpenSearch Serverlessの場合はインフラ側の設定を変えることがほとんどできません。それではOpenSearch Serverlessではどのように対応が可能なのか本ブログではまとめました。

対策1:アプリケーションロジックに指数関数的再試行ロジックを追加する

書き込み拒否としての 429 エラーメッセージはOpenSearch内部のバルクキューがいっぱいになった時に発生します。キューが一杯で新しいリクエストを拒否している状態ですね。ここでBulk Importを実行しているアプリケーションロジックに指数関数的再試行ロジックというものを追加します。

簡単に言うと、書き込みが失敗するたびに2秒 -> 4秒 -> 8秒という形で次の再試行までの時間間隔を伸ばしてリトライを行うロジックです。以下がそれを実装しているコードになります。S3バケットに上がったJSONファイルのリストを取得してその中にあるデータをBulkImport用にまとめてデータの投入を実行しています。

429エラーが返ってきたらsleepによって処理を止めて再実行しています。

const ossClient = new Client({
  ...AwsSigv4Signer({
    region: process.env.AWS_REGION,
    service: ‘aoss’,
    getCredentials: () => {
      const credentialsProvider = defaultProvider()
      return credentialsProvider()
    },
  }),
  node: `https://${process.env.OPENSEARCHSERVERLESS_URL}.${process.env.AWS_REGION}.aoss.amazonaws.com`,
})

interface CsvEvent {
  basepath: string
  key: string
}

export const handler: Handler<CsvEvent> = async (event) => {
  const bucket = process.env.S3_BUCKET
  const prefix = `${event.basepath}/chunk-for-bulk/`
  const keys = await listS3JsonKeys(bucket, prefix)

  const MAX_RETRIES = 20 // リトライの上限回数

  // prefix配下にあるJSONファイルを1ファイルずつBulkInsertする
  for (let i = 0; i < keys.length; i++) {
    const key = keys[i]
    let attempt = 0

    // 429エラーが返ってきたらリトライし続ける仕組み
    while (attempt < MAX_RETRIES) {
      attempt++
      const docs = await loadJsonArrayFromS3(bucket, key)

      // BulkInsert用のPayload作成
      const bulkPayload: Array<Record<string, unknown>> = (
        docs as ConstructionDocument[]
      ).flatMap((doc) => [
        {
          index: {
            _index: ‘construction-info’,
          },
        },
        doc,
      ])

      // BulkInsertの実行
      const res = await ossClient.bulk({ body: bulkPayload })

      // 429エラーが帰ってきたときはリトライを実施
      if (res.body.errors === true && res.body.items[0].index.status === 429) {
        console.warn(
          `BulkInsertエラー試行回数 ${attempt} 回、ソースファイル:${key}`,
        )

        if (attempt >= MAX_RETRIES) {
          throw new Error(
            `429エラーにより、最高試行回数${attempt}回を超えました`,
          )
        }

        // 再試行が失敗するたびに間隔を伸ばす
        await sleep(2000 * attempt)
        continue
      }
      break // while抜けて次のkeyへ
    }
  }
}

対策2:BulkImport単体で処理する件数を減らす

これも重要な対策になります。公式ドキュメントでは大体5-15MB程度のサイズが推奨されています。このあたりは実装をやりながら調整していけばいいでしょう。

AWS StepFunctionsを使ったBulkImport処理の実装

AWS Lambda単体で指数関数的再試行ロジックを実装すると15分のタイムアウトに引っかかってしまう可能性は高いでしょう。そんなときはStepFuntionsのMapStateで処理を分割することをおすすめします。1回のLambdaファンクションが5分程度で終わるくらいを目指してLambdaに処理させるBulkデータの件数を調整しましょう。

そして以下のようにMapStateのmaxConcurrencyを1に設定すれば1つのLambdaファンクションが終わるたびに次のファンクションを実行してくれるのでより安全に処理が組むことが出来ます。

// Mapステート
const map = new sfn.Map(this, ‘Map’, {
  itemsPath: sfn.JsonPath.stringAt(‘$.keys’),
  resultPath: sfn.JsonPath.stringAt(‘$.array’),
  maxConcurrency: 1,
})
Written by
CEO

堀家 隆宏

Takahiro Horike

  • Facebook->
  • X->
  • GitHub->

Share

Facebook->X->
Back
to list
<-