Bulk writes

This page describes how you can improve performance in high-throughput scenarios using bulk writes

For high-throughput scenarios it can be important to avoid the overhead of a large number of requests/network hops when updating or creating multiple aggregates.

The bulk write API targets aggregate types instead of a specific aggregates. This API allows client applications to write event batches to different aggregates of the same type in a single request.

An example use-case

Consider a blog application that notifies all subscribers to the blog when a new post is published. Instead of writing a SubscriberNotified event for each subscriber and storing them individually, we can use the bulk write API to store a SubscriberNotified for each Subcriber aggregate at once in a single request.

Below is an example request for saving a notification to two subscribers in one request. We send a POST request to https://api.serialized.io/aggregates/subscriber/events. Note that the aggregate ids are present in the batches in the payload instead of in the url.

{
  "batches": [
    {
      "aggregateId": "3ff500bf-d33d-46e7-8c5c-8445a2c79eb5",
      "expectedVersion": 7,
      "events": [
        {
          "eventType": "SubscriberNotified",
          "data": {
            "newBlogPostUrl": "https://example.com/some-url"
          }
        }
      ]
    },
    {
      "aggregateId": "9dbcd8a1-3e87-4e2d-be54-806cd56d7cc3",
      "expectedVersion": 7,
      "events": [
        {
          "eventType": "SubscriberNotified",
          "data": {
            "newBlogPostUrl": "https://example.com/some-url"
          }
        }
      ]
    }
  ]
}

Aggregate uniqueness and API constraints

There are some constraints that you should be aware of before deciding if bulk writes are a good match with your use case or not.

An additional constraint of the API is that the size of each request to the API is limited to 1MB. To write more data using the bulk API, see Partitioning the load section below.

Another important difference to regular aggregate writes is that all batches in bulk requests must use Optimistic Concurrency by setting the expectedVersion field. It is allowed to set it to 0 which effectively means creating an aggregate instead of updating it.

Partitioning the load

Even though the bulk write API can handle a large number of events at once, you should always guard for the case that the number of events results in a larger payload size than the maximum allowed (1MB).

To handle this we advise to always partition the workload when working with the bulk write API so that you make sure that the whole workload gets processed. The individual bulks can be stored in parallel since they are independent of each other.

Below is an example of handling this fan-out scenario in Java, using the Java client.

public class Partitioning {

  public static final int BULK_SIZE = 64;
  private final AggregateClient<SubscriptionState> subscriptionClient;

  public Partitioning(AggregateClient<SubscriptionState> subscriptionClient) {
    this.subscriptionClient = subscriptionClient;
  }

  public void handleBlogPostPublished(String postUrl) {

    // Find aggregate IDs to operate on
    List<UUID> subscriptionIds = findAllSubscriptions();

    // Partition the workload
    Lists.partition(subscriptionIds, BULK_SIZE)

      // Updates can be parallel since we only operate on unique aggregates
      .parallelStream()

      // Perform an update per partition
      .forEach(subscriptions -> subscriptionClient.bulkUpdate(new HashSet<>(subscriptions), state -> {
        Subscription subscription = Subscription.fromState(state);
        // Events that should be stored (SubscriberNotified)
        List<Event<?>> events = subscription.notifyBlogPostUpdated(postUrl);
        return events;
      }));

  }
}