feat(postgres): Change the postgres adapter to be partition aware#534
feat(postgres): Change the postgres adapter to be partition aware#534
Conversation
This adds a postgres storage adapter for the taskbroker, as well as providing a way to choose between the adapters in the configuration. This adapter will also work with AlloyDB. In postgres, the keyword `offset` is reserved, so that column is called `kafka_offset` in the PG tables and converted to `offset`. The tests were updated to run with both the SQLite and Postgres adapter using the rstest crate. The `create_test_store` function was updated to be the standard for all tests, and to allow choosing between a SQLite and Postgres DB. A `remove_db` function was added to the trait and the existing adapters, since the tests create a unique PG database on every run that should be cleaned up. The `create_test_store` function was updated to be the standard for all tests, and to allow choosing between an SQLite and Postgres DB.
Have the postgres adapter only fetch and do upkeep on activations that are part of the partition that the consumer is assigned. The broker can still update tasks outside its partitions, in case a worker is connected to a broker that is then rebalanced. Change the consumer to pass the partitions to the store whenever partitions are assigned. This was originally tested with PARTITION BY, but that requires manually keeping track of the partition tables which isn't a desired behaviour.
8689312 to
05125fe
Compare
| state = match (state, event) { | ||
| (ConsumerState::Ready, Event::Assign(tpl)) => { | ||
| metrics::gauge!("arroyo.consumer.current_partitions").set(tpl.len() as f64); | ||
| // Note: This assumes we only process one topic per consumer. |
There was a problem hiding this comment.
So far that has always been true.
| read_pool: PgPool, | ||
| write_pool: PgPool, | ||
| config: PostgresActivationStoreConfig, | ||
| partitions: RwLock<Vec<i32>>, |
There was a problem hiding this comment.
Should you use a tokio RwLock to avoid locking up one of the tokio threads?
There was a problem hiding this comment.
No because updating the partitions isn't an asynchronous operation. So I want this to block.
There was a problem hiding this comment.
Ok, we'll need to be careful about .await calls inside the lock guards as we could deadlock more easily than with an async lock.
markstory
left a comment
There was a problem hiding this comment.
Query changes look good to me.
| read_pool: PgPool, | ||
| write_pool: PgPool, | ||
| config: PostgresActivationStoreConfig, | ||
| partitions: RwLock<Vec<i32>>, |
There was a problem hiding this comment.
Ok, we'll need to be careful about .await calls inside the lock guards as we could deadlock more easily than with an async lock.
|
This was duplicated in code that @james-mcnulty is opening PRs for. |
Have the postgres adapter only fetch and do upkeep on activations that are part of the partition that the consumer is assigned. The broker can still update tasks outside its partitions, in case a worker is connected to a broker that is then rebalanced. Change the consumer to pass the partitions to the store whenever partitions are assigned.