Configuring event streaming with AWS

The Carbon Engine can generate streamed events whenever a transaction or response is created/updated, and when transaction footprints are recalculated. This event stream is sent out through AWS EventBridge, which has a number of compatible adapters suitable for feeding the data into a variety of different systems.

Event streaming can be useful to attach third-party or bespoke systems to the Carbon Engine, such as for analytics, reporting over all customers, push notification services, or to add any additional functionality to Carbon Engine.

The event streaming functionality only works when S3 is being used for storage.

How it works

On creation, change, or deletion, of a file in S3, a notification of change event is sent to a listening SQS queue. This is a standard queue with no encryption configured on it, due to no sensitive information being sent in these events (only the filename is included).

The Carbon Engine periodically polls this SQS queue, and when a notification is received, it queries the changelog within the changed file. From this, a bundled set of change events is sent to the configured EventBridge.

To get events out of the EventBridge, you will need to write an AWS EventBridge Rule that reacts and forwards messages onto your follow-on processing.

NOTE: Downstream processing must be setup to handle duplicate event messages being sent

Assumptions

  1. Bebop cluster is already configured and running
  2. All resources will be based in the same region and account

Creating required AWS resources

  1. Create a standard, unencrypted, SQS queue.
  2. Amend the SQS queue's access policy by adding in the following statement to the Statement array, replacing REGION, ACCOUNT, QUEUE_NAME and S3_BUCKET_NAME with values that are applicable to your environment:
{
  "Sid": "s3-event-notification",
  "Effect": "Allow",
  "Principal": {
    "Service": "s3.amazonaws.com"
  },
  "Action": "sqs:SendMessage",
  "Resource": "arn:aws:sqs:REGION:ACCOUNT:QUEUE_NAME",
  "Condition": {
    "StringEquals": {
      "aws:SourceAccount": "ACCOUNT"
    },
    "ArnLike": {
      "aws:SourceArn": "arn:aws:s3:*:*:S3_BUCKET_NAME"
    }
  }
}
  1. Create an "event notification" on your S3 bucket. Select "All object create events" (s3:ObjectCreated:*) and "Permanently deleted" (s3:ObjectRemoved:Delete). Use your newly-created SQS queue as the destination for the events.
  2. Create an EventBridge event bus.
  3. Update the policy granted to the IAM user or role so that it can access SQS and EventBridge, replacing REGION, ACCOUNT, BUS_NAME and QUEUE_NAME with values that are applicable to your environment:
{
  "Sid": "streaming-access",
  "Effect": "Allow",
  "Action": ["events:PutEvents", "sqs:ReceiveMessage", "sqs:DeleteMessage"],
  "Resource": [
    "arn:aws:events:REGION:ACCOUNT:event-bus/BUS_NAME",
    "arn:aws:sqs:REGION:ACCOUNT:QUEUE_NAME"
  ]
}

Configuring the Carbon Engine

Pass in these two environment variables to the Carbon Engine container to enable event streaming:

Event format

Business events put on the event bus have a source of co.cogo.* and a detail type of persistence.

The detail field contains a JSON object with two keys: events, which holds an array of one or more events, and test_events, a boolean which indicates if all the attached events should be ignored by production systems. In general, you should only react to a message containing events if test_events is false.

Each "update" event is an object with the following keys:

Refer to the Get Transaction and Query user responses API endpoint documentation to learn about the format of the data object.

"Delete" events will also be sent when an S3 file is deleted. These events contain the following keys:

An example of an update entry follows:

{
  "test_events": false,
  "events": [
    {
      "user_id": "sample-user-id",
      "type": "txn",
      "write_counter": 14,
      "data": {
        "amount": "22.00",
        "category": "TAKEAWAY",
        "date": "2020-05-03",
        "description": "GOLDEN WOK TAKEAWAYS",
        "footprint": "15.39",
        "id": "19729376483",
        "revision_id": 15,
        "savings": [
          { "code": "diet", "type": "unrealised", "value": "19.491" },
          { "code": "diet", "type": "realised", "value": "5.19332" }
        ],
        "custom": {}
      }
    },
    {
      "user_id": "sample-user-id",
      "type": "responses",
      "write_counter": 14,
      "data": {
        "diet": [
          {
            "end_date": "2019-05-15",
            "start_date": null,
            "value": "all"
          },
          {
            "end_date": null,
            "start_date": "2019-05-16",
            "value": "vegetarian"
          }
        ]
      }
    }
  ]
}