Serverless Operations, inc

>_cd /blog/id_043

title

サーバーレスアーキテクチャでリアルタイムランキングAPIを作る

概要

WWDJAPANというメディアのお仕事を手伝ってまして、そこでリアルタイムにサイトへのアクセス数をカウントしてランキングを表示する仕組みをAWSで作った(まだ、サイト側には公開していませんが)のでそれの仕組みとかコードを共有したいと思います。

ちなみにGoogle AnalyticsのリアルタイムAPIを使う方法もあるのですが、APIの使用申請をGoogleに送ったところ、何故かスルーされてしまったので独自に作る方針としました。

アーキテクチャ

APIとして以下の2つ必要になります。そしてバックエンドにはKinesisをおいてアクセスデータをバッファさせる仕組みです。
– サイトのPVを収集するためのAPI
– ランキングの結果を取得するAPI(WebSocketにしても面白いと思う)

PV収集用API

API GatewayとKinesis StreamをAWS Service Proxyの機能を使って直接接続します。
このAPIはサイトの記事詳細ページに埋め込んで、ページが表示するたびにその記事IDをRanking Consumer Streamに対して記事IDをPutします。

ここのシャード数はサイトからの同時接続数によって変わってくるので、実際に埋め込んでみて調整するのが良いでしょう。
そして、Ranking Consumerの背後にいるLambdaでRanking Collector Streamにシャード毎のデータをまとめてPutします。

Ranking Collector Streamのシャードは1本固定にしています。これはランキング取得APIから1本のシャードに対してデータをGetすることで集計をしやすくするという意図があります。また、Ranking Consumer Streamのシャードを増減させてもLambda自体のコードには手を加えなくてよいというメリットもあります。

ランキング取得API

GetShardIteratorGetRecordsオペレーションで現在時刻から数分前までにputされたデータをKinesisから取り出します。

以下のように、ShardIteratorTypeAT_TIMESTAMPを指定すると指定の時刻以降にシャードにputされたデータのみを取得してくれるので、これを現在時刻から5分前までなどに指定することでリアルタイム性を出しています。

await aws.request('Kinesis', 'getShardIterator', {
  ShardId: 'shardId-000000000000',
  ShardIteratorType: 'AT_TIMESTAMP',
  StreamName: streamName,
  Timestamp: dt // ここに指定したタイムスタンプ以降にシャードにputされたデータのみを取得する
})

そして以下のようなランキングのJsonを生成してAPI に返却します。ランキング結果

[
  {
    "post_id": 795075,
    "count": 58
  },
  {
    "post_id": 794978,
    "count": 43
  },
  {
    "post_id": 794963,
    "count": 32
  },
  {
    "post_id": 754499,
    "count": 25
  },
  {
    "post_id": 795236,
    "count": 20
  },
  {
    "post_id": 795228,
    "count": 19
  },
  {
    "post_id": 795939,
    "count": 18
  },
  {
    "post_id": 795021,
    "count": 18
  },
  {
    "post_id": 795841,
    "count": 16
  },
  {
    "post_id": 793589,
    "count": 15
  }
]

構成管理

Serverless Frameworkを使用しています。以下がそのserverless.yamlファイルの全貌です。serverless.yml

service: realtime-ranking

provider:
  name: aws
  runtime: nodejs8.10
  stage: dev
  region: ap-northeast-1

functions:
  rankingCollector:
    handler: lambda/apiGateway/rankingCollector.handler
    events:
      - http:
          path: /ranking
          method: get
          cors: true
    environment:
      MINUITES_OF_RANKING_TERM: 10
      COUNT_OF_RANKING: 10
      COLLECTOR_STREAM_NAME: { Ref: 'RankingCollectorStream' }
    iamRoleStatements:
      - Effect: Allow
        Action:
           - 'kinesis:GetRecords'
           - 'kinesis:GetShardIterator'
        Resource: {"Fn::GetAtt":[ "RankingCollectorStream", "Arn" ]}
  rankingConsumer:
    handler: lambda/kinesisStreams/rankingConsumer.handler
    events:
      - stream:
          type: kinesis
          batchSize: 1000
          arn:
            Fn::GetAtt:
              - RankingConsumerStream
              - Arn
    environment:
      COLLECTOR_STREAM_NAME: { Ref: 'RankingCollectorStream' }
    iamRoleStatements:
      - Effect: Allow
        Action:
           - 'kinesis:PutRecord'
        Resource: {"Fn::GetAtt":[ "RankingCollectorStream", "Arn" ]}
custom:
  stage: ${opt:stage, self:provider.stage}
  apiGatewayServiceProxies:
    - kinesis:
        path: /ranking
        method: post
        streamName: { Ref: 'RankingConsumerStream' }
        cors: true

resources:
  Resources:
    RankingConsumerStream:
      Type: AWS::Kinesis::Stream
      Properties:
        ShardCount: 3
        Name: ${self:service}-${self:custom.stage}-ranking-consumer
    RankingCollectorStream:
      Type: AWS::Kinesis::Stream
      Properties:
        ShardCount: 1
        Name: ${self:service}-${self:custom.stage}-ranking-collector
plugins:
  - serverless-apigateway-service-proxy
  - serverless-iam-roles-per-function

今回、KinesisとAPI GatewayをLambdaを挟むことなく直接接続するために、Serverless API Gateway Service ProxyというServerless Frameworkのプラグインを開発して公開しました。

今までであれば、これを実現するために膨大なCloudFormationを記述しないといけなかったのですが、以下のようにたった数行で実現できてしまいます。是非使う機会などあれば、GitHubにスター付けてくれたりフィードバックもらえると嬉しいです。

custom:
  apiGatewayServiceProxies:
    - kinesis:
        path: /ranking
        method: post
        streamName: { Ref: 'RankingConsumerStream' }
        cors: true

というわけで、みなさまよいリアルタイムランキングライフを!

Written by
CEO

堀家 隆宏

Takahiro Horike

  • Facebook->
  • X->
  • GitHub->
Back
to list
<-