Architecting Cloud Native Applications
上QQ阅读APP看书,第一时间看更新

Example – inverse oplock

This example demonstrates the most common form of the materialized view and the basic building blocks. We need to store the latest version of a select set of fields from a specific domain entity. We also need to account for events arriving out of order. On the authoring side of the equation, we typically need to account for multiple users updating the same record simultaneously and one transaction overwriting another. Optimistic locking is the typical solution to this problem. We store a timestamp or sequence number on each record and update this value with each successful update, so long as the current value in the database is equal to the value that was previously retrieved before the update. We throw an error when the assertion fails and delegate the handling of the specific scenario to the user.

On the consumer side of the equation, the events are facts that represent the history of an instance of a domain entity. We are not concerned with concurrency. We want to make certain that we do not overwrite newer events with older events. We still assert on an optimistic lock value; however, our reaction to the failure of the assertion is the opposite on this end of the equation. We can simply ignore older events when they arrive after newer events.

The following is a fragment from a Serverless Framework serverless.yml file, which defines the AWS DynamoDB table that is used to store the materialized view. It is no different from most DynamoDB table definitions, which means that from the query perspective, this materialized view works just like any other DynamoDB table. The developer experience is the same. A typical alternative is to store the materialized view in a search engine. The query requirements will usually drive the decision regarding which database type is the most appropriate. For example, a search engine is usually the best choice for geospatial and natural language queries. When we only ever access an object by its identifier, then it may be appropriate to persist the view in blob storage. This is often used in conjunction with a search engine for the query fields and blob storage for the record details.

View:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${opt:stage}-${self:service}-view
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1

Next, we have the code to save the data in the view. It is a stream processor like the others we have discussed and leverages the functional reactive programming paradigm. The logic maps the required fields, sets up the conditional expression for the optimistic locking logic, and puts the data in the table. It catches and ignores ConditionalCheckFailedException. For all other exceptions,this example produces a fault based on the Stream Circuit Breaker pattern. In this example, oplock is based on the event timestamp. An alternative is to use the sequence number provided by the stream. This is useful when the timestamp may not be unique because it is too coarse-grained. However, if the events are not published to the stream in the correct order, then the sequence number will not be appropriate. In which case, if it is not unique, then the event sourcing approach may be the proper solution.

export const consumer = (event, context, cb) => {
_(event.Records)
.map(recordToUow)
.filter(forItemCreateOrUpdated)
.flatMap(save)
.collect().toCallback(cb);
};

const save = uow => {
const params = {
TableName: process.env.TABLE_NAME,
Item: {
// map the identifier
id: uow.event.item.id,

// map the fields needed for the view
name: uow.event.item.name,
description: uow.event.item.description,


// include the timestamp or sequence number
oplock: uow.event.timestamp,
},

// define the oplock logic
ConditionExpression: 'attribute_not_exists(#oplk) OR #oplk < :ts',
ExpressionAttributeNames: {
'#oplk': 'oplock'
},
ExpressionAttributeValues: {
':ts': uow.event.timestamp
},
};

const db = new aws.DynamoDB.DocumentClient();

return _(db.put(params).promise()
.catch(err => {
// check if we can ignore the error
if (err.code !== 'ConditionalCheckFailedException') {
// otherwise raise a fault
err.uow = uow;
throw err;
}
})
);
}