Performance and scalability
Usage Engine is designed to handle very large data volumes with strong processing guarantees. As a user designing a stream, there are a few concepts you need to be aware of that relate to performance and scalability.
This chapter consists of the following:
Note!
By default, when working with files, Usage Engine Cloud Edition will either successfully process an entire input file, or not process the input file at all. This is called transactional processing. This transactional processing helps ensure that data is neither lost nor duplicated.
Batching multiple files per transaction
If you are processing many small files, the performance overhead of having one transaction per file can reduce performance.
It is possible to batch multiple files into a single transaction when collecting files with the Amazon S3 collector.
In the configuration:
Set the Transaction Batch Size to a value greater than 1 to process multiple files per transaction.
Note!
A general rule of thumb is to set the Transaction Batch Size value somewhere between 100 to 1000 files if your files are many and small. Look at your Stream metrics (LINK) to see the time it takes to process each file and each transaction. You may need to tune this value to achieve optimal performance for your stream. You can change the transaction batch size between stream executions.
When a stream is executed, by default Usage Engine Cloud Edition processes them all in sequence in a single runtime instance of the stream.
Running multiple stream replicas in parallel
If you are processing many records, it may take a long time to process all the records in sequence, which reduces performance.
To resolve this issue, it is possible to configure a number of stream replicas that will process input files in parallel. This reduces the total time it takes to process all the records, or your stream execution time. Each stream replica will run its own transaction or series of transactions (depending on the batch size you have configured).
Note!
The number of stream replicas must be chosen when you create your stream. It cannot be changed once set. This is because of persistent states in Usage Engine such as aggregation sessions and de-duplication information is stored separately per replica.
Replicas and file collection
When using replicas, you must structure your folders and files and assign filenames in a way that allows different replicas to collect different files. In other words, you must partition your input files.
This is accomplished using Variable insertion. The variable ${replicaNumber} will always be available and have a value from 0 to N-1 where N is the number of replicas in your stream. For example, if you have 10 replicas, the replicaNumber variable will have values from 1 to 9.
You can use this to point different replicas to different folders. For example, if your folders are structured like this:
/input/folder-0
/input/folder-1
…
/input/folder-9
Then you can configure your Amazon S3 collector’s Folder field in File information like this:
/input/folder-${replicaNumber}
The result will be that the replica with replicaNumber = 0 will read all the files in /input/folder-0, the replica with replicaNumber = 1 will read all the input files in /input/folder-1, etc. Another way of saying this is that the input files have been split into 10 partitions, one per replica.
Replicas and stateful processing
If your stream uses aggregation or de-duplication or any other stateful function that stores information persistently across stream executions, the same stateful session key value (e.g. user id for aggregation, or the field values used to de-duplicate records) must always be sent to the same replica. You control this by choosing which replicas process which files.
To make this more concrete, make sure files with a given customer id that you aggregate usage for are always processed by the same replica. Example, if you have 3000 customers and 3 replicas, you can distribute your files like this:
Replica 1 always processes files from customers 1-1000.
Replica 2 always processes files from customers 1001-2000.
Replica 3 always processes files from customers 2001-3000.
If you don’t do this, you may end up with multiple aggregation sessions for a single user, or your de-duplication may not catch all duplicates correctly. Both issues will create problems in your downstream system, e.g. for billing.
Replicas and performance
When using replicas, the processing is done in parallel. If you partition your input files well, this can significantly speed up processing. This way of parallelizing processing is called horizontal scaling, or scaling out.
When not using replicas, a single stream instance processes all input files in sequence, as shown at the top of the illustration. The total stream execution time is the cumulative of time taken for each file to be processed.
But when using replicas, three stream replicas process the input files in parallel, so the total stream execution time is much shorter. This is shown at the bottom of the illustration.
This concept can be generalized to using hundreds of replicas across millions of files, scaling horizontally to reduce the total stream execution time.
Distributing input files evenly among replicas
To get the best performance from using replicas, it is important to partition the input files evenly. Imagine an example, where you have 100 replicas. If you collect 99% of all files with replica 0, and 1% of all files with replicas 1-99, your stream execution time will not improve much.
But if you distribute the input files evenly across the replicas, so each replica processes roughly 1% of all input files, then your stream execution time will be much shorter. This is what we mean by partitioning evenly.
Replicas and ordering of records
When not using replicas, records are processed in the same order they were collected.
When using replicas, the order of processing is preserved for all records processed by the same replica. However, there are no ordering guarantees in between different replicas. This can happen:
Replica 0 collects record X at time T.
Replica 1 collects record Y at time T+1.
Replica 1 forwards record Y at time T+10.
Replica 0 forwards record X at time T+11.
Make sure to take this into account when building your streams and in any downstream system.
Frequently asked questions
How many replicas should I use?
Since the number of replicas cannot be modified, you must plan how many replicas you wish to set for a stream. You may need to do some testing and tuning before setting the number of replicas to run in the production environment.
Some things to consider:
How long does it take to process all my input in a single stream instance, without replicas? E.g. For input files collected over a certain day, what is the total stream execution time?
How long is the stream execution time allowed to take? What time constraints do you have?
How evenly can you partition your input files across replicas?
In ideal scenarios, this formula is applicable:
However, this almost never happens in real life. When using replicas you will see a reduction of stream execution time that is typically somewhere between 20% and 99%. Generally, the more replicas, the shorter the execution time, but you will eventually hit a point where adding more replicas doesn’t reduce the stream execution time further.
This will depend on what your stream does. For example, if it requires a warmup phase to cache data, how evenly input files are partitioned if it accesses external systems that introduce bottlenecks, and so on.
What if my data volumes grow and I need to increase the number of replicas?
This is called scaling-out.
To do this, you need to create a copy of your stream with more replicas. Then you must ensure you flush out all persistent states from the old running version - particularly aggregation sessions and de-duplication information. Then you can start processing data with the new stream version that has more partitions.
What if I created too many replicas and don’t need them?
This is called scaling-in.
Replicas that don’t find any input files will not be executed for more than a brief moment, so a simple way is to just to stop sending input files to some replicas. If you do need to reduce the number of replicas, follow the same steps taken when increasing the number of replicas above.
Why can Usage Engine not scale out and in for me automatically?
We aim to provide elastic scaling in future versions of Usage Engine. For now, the number of stream replicas is static and can’t be changed.