Configuring event streaming with AWS
The Carbon Engine can generate streamed events whenever a transaction or response is created/updated, and when transaction footprints are recalculated. This event stream is sent out through AWS EventBridge, which has a number of compatible adapters suitable for feeding the data into a variety of different systems.
Event streaming can be useful to attach third-party or bespoke systems to the Carbon Engine, such as for analytics, reporting over all customers, push notification services, or to add any additional functionality to Carbon Engine.
The event streaming functionality only works when S3 is being used for storage.
How it works
On creation, change, or deletion, of a file in S3, a notification of change event is sent to a listening SQS queue. This is a standard queue with no encryption configured on it, due to no sensitive information being sent in these events (only the filename is included).
The Carbon Engine periodically polls this SQS queue, and when a notification is received, it queries the changelog within the changed file. From this, a bundled set of change events is sent to the configured EventBridge.
To get events out of the EventBridge, you will need to write an AWS EventBridge Rule that reacts and forwards messages onto your follow-on processing.
NOTE: Downstream processing must be setup to handle duplicate event messages being sent
Assumptions
- Bebop cluster is already configured and running
- All resources will be based in the same region and account
Creating required AWS resources
- Create a standard, unencrypted, SQS queue.
- Amend the SQS queue's access policy by adding in the following statement to the
Statementarray, replacingREGION,ACCOUNT,QUEUE_NAMEandS3_BUCKET_NAMEwith values that are applicable to your environment:
{
"Sid": "s3-event-notification",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": "sqs:SendMessage",
"Resource": "arn:aws:sqs:REGION:ACCOUNT:QUEUE_NAME",
"Condition": {
"StringEquals": {
"aws:SourceAccount": "ACCOUNT"
},
"ArnLike": {
"aws:SourceArn": "arn:aws:s3:*:*:S3_BUCKET_NAME"
}
}
}
- Create an "event notification" on your S3 bucket. Select "All object create events" (
s3:ObjectCreated:*) and "Permanently deleted" (s3:ObjectRemoved:Delete). Use your newly-created SQS queue as the destination for the events. - Create an EventBridge event bus.
- Update the policy granted to the IAM user or role so that it can access SQS and EventBridge, replacing
REGION,ACCOUNT,BUS_NAMEandQUEUE_NAMEwith values that are applicable to your environment:
{
"Sid": "streaming-access",
"Effect": "Allow",
"Action": ["events:PutEvents", "sqs:ReceiveMessage", "sqs:DeleteMessage"],
"Resource": [
"arn:aws:events:REGION:ACCOUNT:event-bus/BUS_NAME",
"arn:aws:sqs:REGION:ACCOUNT:QUEUE_NAME"
]
}
Configuring the Carbon Engine
Pass in these two environment variables to the Carbon Engine container to enable event streaming:
COGO_S3_EVENT_QUEUEis set to the full URL of the SQS event queue created aboveCOGO_BUSINESS_EVENT_SINKis set to the name of the EventBridge event bus created above
Event format
Business events put on the event bus have a source of co.cogo.* and a detail type of persistence.
The detail field contains a JSON object with two keys: events, which holds an array of one or more events, and test_events, a boolean which indicates if all the attached events should be ignored by production systems. In general, you should only react to a message containing events if test_events is false.
Each "update" event is an object with the following keys:
user_id: the user ID to which the event pertainstype: eithertxnfor a transaction creation/update orresponsesfor a responses creation/updatewrite_counter: a monotonically increasing number that specifies how many times this user's data record has been written todata: an object holding the event data, which differs depending on thetype.
Refer to the Get Transaction and Query user responses API endpoint documentation to learn about the format of the data object.
"Delete" events will also be sent when an S3 file is deleted. These events contain the following keys:
user_id: the user ID to which the event pertainstype:deleted
An example of an update entry follows:
{
"test_events": false,
"events": [
{
"user_id": "sample-user-id",
"type": "txn",
"write_counter": 14,
"data": {
"amount": "22.00",
"category": "TAKEAWAY",
"date": "2020-05-03",
"description": "GOLDEN WOK TAKEAWAYS",
"footprint": "15.39",
"id": "19729376483",
"revision_id": 15,
"savings": [
{ "code": "diet", "type": "unrealised", "value": "19.491" },
{ "code": "diet", "type": "realised", "value": "5.19332" }
],
"custom": {}
}
},
{
"user_id": "sample-user-id",
"type": "responses",
"write_counter": 14,
"data": {
"diet": [
{
"end_date": "2019-05-15",
"start_date": null,
"value": "all"
},
{
"end_date": null,
"start_date": "2019-05-16",
"value": "vegetarian"
}
]
}
}
]
}