対策1:アプリケーションロジックに指数関数的再試行ロジックを追加する
書き込み拒否としての 429 エラーメッセージはOpenSearch内部のバルクキューがいっぱいになった時に発生します。キューが一杯で新しいリクエストを拒否している状態ですね。ここでBulk Importを実行しているアプリケーションロジックに指数関数的再試行ロジックというものを追加します。
簡単に言うと、書き込みが失敗するたびに2秒 -> 4秒 -> 8秒という形で次の再試行までの時間間隔を伸ばしてリトライを行うロジックです。以下がそれを実装しているコードになります。S3バケットに上がったJSONファイルのリストを取得してその中にあるデータをBulkImport用にまとめてデータの投入を実行しています。
429エラーが返ってきたらsleepによって処理を止めて再実行しています。
const ossClient = new Client({
...AwsSigv4Signer({
region: process.env.AWS_REGION,
service: ‘aoss’,
getCredentials: () => {
const credentialsProvider = defaultProvider()
return credentialsProvider()
},
}),
node: `https://${process.env.OPENSEARCHSERVERLESS_URL}.${process.env.AWS_REGION}.aoss.amazonaws.com`,
})
interface CsvEvent {
basepath: string
key: string
}
export const handler: Handler<CsvEvent> = async (event) => {
const bucket = process.env.S3_BUCKET
const prefix = `${event.basepath}/chunk-for-bulk/`
const keys = await listS3JsonKeys(bucket, prefix)
const MAX_RETRIES = 20 // リトライの上限回数
// prefix配下にあるJSONファイルを1ファイルずつBulkInsertする
for (let i = 0; i < keys.length; i++) {
const key = keys[i]
let attempt = 0
// 429エラーが返ってきたらリトライし続ける仕組み
while (attempt < MAX_RETRIES) {
attempt++
const docs = await loadJsonArrayFromS3(bucket, key)
// BulkInsert用のPayload作成
const bulkPayload: Array<Record<string, unknown>> = (
docs as ConstructionDocument[]
).flatMap((doc) => [
{
index: {
_index: ‘construction-info’,
},
},
doc,
])
// BulkInsertの実行
const res = await ossClient.bulk({ body: bulkPayload })
// 429エラーが帰ってきたときはリトライを実施
if (res.body.errors === true && res.body.items[0].index.status === 429) {
console.warn(
`BulkInsertエラー試行回数 ${attempt} 回、ソースファイル:${key}`,
)
if (attempt >= MAX_RETRIES) {
throw new Error(
`429エラーにより、最高試行回数${attempt}回を超えました`,
)
}
// 再試行が失敗するたびに間隔を伸ばす
await sleep(2000 * attempt)
continue
}
break // while抜けて次のkeyへ
}
}
}
対策2:BulkImport単体で処理する件数を減らす
これも重要な対策になります。公式ドキュメントでは大体5-15MB程度のサイズが推奨されています。このあたりは実装をやりながら調整していけばいいでしょう。
AWS StepFunctionsを使ったBulkImport処理の実装
AWS Lambda単体で指数関数的再試行ロジックを実装すると15分のタイムアウトに引っかかってしまう可能性は高いでしょう。そんなときはStepFuntionsのMapStateで処理を分割することをおすすめします。1回のLambdaファンクションが5分程度で終わるくらいを目指してLambdaに処理させるBulkデータの件数を調整しましょう。
そして以下のようにMapStateのmaxConcurrencyを1に設定すれば1つのLambdaファンクションが終わるたびに次のファンクションを実行してくれるのでより安全に処理が組むことが出来ます。
// Mapステート
const map = new sfn.Map(this, ‘Map’, {
itemsPath: sfn.JsonPath.stringAt(‘$.keys’),
resultPath: sfn.JsonPath.stringAt(‘$.array’),
maxConcurrency: 1,
})