Yeah, I mean, I think we're all basically doing this now, right? I wouldn't choose this design, but I think something similar to DeltaLake can be simplified down for tons of use cases. Manifest with CAS + buffered objects to S3, maybe compaction if you intend to do lots of reads. It's not hard to put it together.
You can achieve stupidly fast read/write operations if you do this right with a system that is shocking simple to reason about.
> Step 4: queue.json with an HA brokered group commit
> The broker is stateless, so it's easy and inexpensive to move. And if we end up with more than one broker at a time? That's fine: CAS ensures correctness even with two brokers.
TBH this is the part that I think is tricky. Just resolving this in a way that doesn't end up with tons of clients wasting time talking to a broker that buffers their writes, pushes them, then always fails. I solved this at one point with token fencing and then decided it wasn't worth it and I just use a single instance to manage all writes. I'd again point to DeltaLake for the "good" design here, which is to have multiple manifests and compaction, which also unlocks parallel writers.
The other hard part is data deletion. For the queue it looks deadly simple since it's one file, but if you want to ramp up your scale and get multiple writers or manage indexes (also in S3) then deletion becomes something you have to slip into compaction. Again, I had it at one point and backed it out because it was painful.
But I have 40k writes per second working just fine for my setup, so I'm not worrying. I'd suggest others basically punt as hard as possible on this. If you need more writes, start up a separate index with its own partition for its own separate set of data, or do naive sharding.
- .. but object storage "solves" most of that for you, handing you a set of semantics which work reliably
- single file throughput sucks hilariously badly
- .. because 1Gb is ridiculously large for an atomic unit
- (this whole thing resembles a project I did a decade ago for transactional consistency on TFAT on Flash, except that somehow managed faster commit times despite running on a 400Mhz MIPS CPU. Edit: maybe I should try to remember how that worked and write it up for HN)
- therefore, all of the actual work is shifted to the broker. The broker is just periodically committing its state in case it crashes
- it's not clear whether the broker ACKs requests before they're in durable storage? Is it possible to lose requests in flight anyway?
- there's a great design for a message queue system between multiple nodes that aims for at least once delivery, and has existed for decades, while maintaining high throughput: SMTP. Actually, there's a whole bunch of message queue systems?
> The broker runs a single group commit loop on behalf of all clients, so no one contends for the object. Critically, it doesn't acknowledge a write until the group commit has landed in object storage. No client moves on until its data is durably committed.
You can, and it's actually great if you store little "headers" etc to tell you those offsets. Their design doesn't seem super amenable to it because it appears to be one file, but this is why a system that actually intends to scale would break things up. You then cache these headers and, on cache hit, you know "the thing I want is in that chunk of the file, grab it". Throw in bloom filters and now you have a query engine.
Amazon S3 Select enables SQL queries directly on CSV, JSON, or Apache Parquet objects, allowing retrieval of filtered data subsets to reduce latency and costs
Love this writeup. There's so much interesting stuff you can build on top of Object Storage + compare-and-swap. You learn a lot about distributed systems this way.
I'd love to see a full sample implementation based on s3 + ecs - just to study how it works.
The original graph appears to simply show the blocking issue of their previous synchronisation mechanism; 10 min to process an item down to 6 min. Any central system would seem to resolve this for them.
In any organisation its good to make choices for simplicity rather than small optimisations - you're optimising maintenance, incident resolution, and development.
Typically I have a small pg server for these things. It'll work out slightly more expensive than this setup for one action, yet it will cope with so much more - extending to all kinds of other queues and config management - with simple management, off the shelf diagnostics etc.
While the object store is neat, there is a confluence of factors which make it great and simple for this workload, that may not extend to others. 200ms latency is a lot for other workloads, 5GB/s doesn't leave a lot of headroom, etc. And I don't want to be asked to diagnose transient issues with this.
So I'm torn. It's simple to deploy and configure from a fresh deployment PoV. Yet it wouldn't be accepted into any deployment I have worked on.
The usual path an engineer takes is to take a complex and slow system and reengineer it into something simple, fast, and wrong. But as far as I can tell from the description in the blog though, it actually works at scale! This feels like a free lunch and I’m wondering what the tradeoff is.
It seems like this is an approach that trades off scale and performance for operational simplicity. They say they only have 1GB of records and they can use a single committer to handle all requests. Failover happens by missing a compare-and-set so there's probably a second of latency to become leader?
This is not to say it's a bad system, but it's very precisely tailored for their needs. If you look at the original Kafka implementation, for instance, it was also very simple and targeted. As you bolt on more use cases and features you lose the simplicity to try and become all things to all people.
> Failover happens by missing a compare-and-set so there's probably a second of latency to become leader?
Conceptually that makes sense. How complicated is it to implement this failover logic in a safe way? If there are two processes, competing for CAS wins, is there not a risk that both will think they're non-leaders and terminate themselves?
This is the hardest part because you can easily end up in a situation like you're describing, or having large portions of clients talking to a server just to have their writes rejected.
Further, this system (as described) scales best when writes are colocated (since it maximizes throughput via buffering). So even just by having a second writer you cut your throughput in ~half if one of them is basically dead.
If you split things up you can just do "merge manifests on conflict" since different writers would be writing to different files and the manifest is just an index, or you can do multiple manifests + compaction. DeltaLake does the latter, so you end up with a bunch of `0000.json`, `0001.json` and to reconstruct the full index you read all of them. You still have conflicts on allocating the json file but that's it, no wasted flushing. And then you can merge as you please.
Depending on who hosts your object storage this seems like it could get much more expensive than using a queue table in your database? But I'm also aware that this is a blog post of an object storage company.
We don't have a relational database, otherwise that would work great for a queue! You can imagine us continuing to iterate here to Step 5, Step 6, ... Step N over time. The tradeoff of each step is complexity, and complexity has to be deserved. This is working exceptionally well currently.
> You can imagine us continuing to iterate here to Step 5, Step 6, ... Step N over time. The tradeoff of each step is complexity, and complexity has to be deserved. This is working exceptionally well currently.
Makes total sense for your use case! I have got bitten by using object storage as a database before (and churning through "update" ops) so this will depend on the pricing (and busy-ness of the queue of course) of the provider anyway. Using whatever you have available instead of introducing complexity is the way. Sqlite / Postgres goes a long way for use cases you wouldn't originally think would go well with a relational database too (full text search, using as queue,...).
This post touches on a realisation I made a while ago, just how far you can get with the guarantees and trade-offs of object storage.
What actually _needs_ to be in the database? I've never gone as far as building a job queue on top of object storage, but have been involved in building surprisingly consistent and reliable systems with object storage.
Assuming you already using object storage in your project, but don't use Redis yet it wouldn't be re-inventing but just avoiding an extra dependency that would only be used by a single feature.
You can achieve stupidly fast read/write operations if you do this right with a system that is shocking simple to reason about.
> Step 4: queue.json with an HA brokered group commit > The broker is stateless, so it's easy and inexpensive to move. And if we end up with more than one broker at a time? That's fine: CAS ensures correctness even with two brokers.
TBH this is the part that I think is tricky. Just resolving this in a way that doesn't end up with tons of clients wasting time talking to a broker that buffers their writes, pushes them, then always fails. I solved this at one point with token fencing and then decided it wasn't worth it and I just use a single instance to manage all writes. I'd again point to DeltaLake for the "good" design here, which is to have multiple manifests and compaction, which also unlocks parallel writers.
The other hard part is data deletion. For the queue it looks deadly simple since it's one file, but if you want to ramp up your scale and get multiple writers or manage indexes (also in S3) then deletion becomes something you have to slip into compaction. Again, I had it at one point and backed it out because it was painful.
But I have 40k writes per second working just fine for my setup, so I'm not worrying. I'd suggest others basically punt as hard as possible on this. If you need more writes, start up a separate index with its own partition for its own separate set of data, or do naive sharding.
- concurrency is very hard
- .. but object storage "solves" most of that for you, handing you a set of semantics which work reliably
- single file throughput sucks hilariously badly
- .. because 1Gb is ridiculously large for an atomic unit
- (this whole thing resembles a project I did a decade ago for transactional consistency on TFAT on Flash, except that somehow managed faster commit times despite running on a 400Mhz MIPS CPU. Edit: maybe I should try to remember how that worked and write it up for HN)
- therefore, all of the actual work is shifted to the broker. The broker is just periodically committing its state in case it crashes
- it's not clear whether the broker ACKs requests before they're in durable storage? Is it possible to lose requests in flight anyway?
- there's a great design for a message queue system between multiple nodes that aims for at least once delivery, and has existed for decades, while maintaining high throughput: SMTP. Actually, there's a whole bunch of message queue systems?
Works great for Parquet.
Still, it's nbd. You can cache a billion Parquet header/footers on disk/ memory and get 90% of the performance (or better tbh).
I'd love to see a full sample implementation based on s3 + ecs - just to study how it works.
In any organisation its good to make choices for simplicity rather than small optimisations - you're optimising maintenance, incident resolution, and development.
Typically I have a small pg server for these things. It'll work out slightly more expensive than this setup for one action, yet it will cope with so much more - extending to all kinds of other queues and config management - with simple management, off the shelf diagnostics etc.
While the object store is neat, there is a confluence of factors which make it great and simple for this workload, that may not extend to others. 200ms latency is a lot for other workloads, 5GB/s doesn't leave a lot of headroom, etc. And I don't want to be asked to diagnose transient issues with this.
So I'm torn. It's simple to deploy and configure from a fresh deployment PoV. Yet it wouldn't be accepted into any deployment I have worked on.
This is not to say it's a bad system, but it's very precisely tailored for their needs. If you look at the original Kafka implementation, for instance, it was also very simple and targeted. As you bolt on more use cases and features you lose the simplicity to try and become all things to all people.
Conceptually that makes sense. How complicated is it to implement this failover logic in a safe way? If there are two processes, competing for CAS wins, is there not a risk that both will think they're non-leaders and terminate themselves?
Further, this system (as described) scales best when writes are colocated (since it maximizes throughput via buffering). So even just by having a second writer you cut your throughput in ~half if one of them is basically dead.
If you split things up you can just do "merge manifests on conflict" since different writers would be writing to different files and the manifest is just an index, or you can do multiple manifests + compaction. DeltaLake does the latter, so you end up with a bunch of `0000.json`, `0001.json` and to reconstruct the full index you read all of them. You still have conflicts on allocating the json file but that's it, no wasted flushing. And then you can merge as you please.
We don't have a relational database, otherwise that would work great for a queue! You can imagine us continuing to iterate here to Step 5, Step 6, ... Step N over time. The tradeoff of each step is complexity, and complexity has to be deserved. This is working exceptionally well currently.
Love this approach
What actually _needs_ to be in the database? I've never gone as far as building a job queue on top of object storage, but have been involved in building surprisingly consistent and reliable systems with object storage.