Skip to content

Commit 321ebff

Browse files
committed
feat: add orchestrator/common package
Adds a package with common resources that can be shared between more than one orchestrator implementation. This is conceptually distinct from the common resources in the `database` package, which are shared across all current and future orchestrator implementations. PLAT-417
1 parent b768409 commit 321ebff

28 files changed

+3093
-7
lines changed

server/cmd/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/pgEdge/control-plane/server/internal/migrate"
2323
"github.com/pgEdge/control-plane/server/internal/monitor"
2424
"github.com/pgEdge/control-plane/server/internal/orchestrator"
25+
"github.com/pgEdge/control-plane/server/internal/orchestrator/common"
2526
"github.com/pgEdge/control-plane/server/internal/orchestrator/swarm"
2627
"github.com/pgEdge/control-plane/server/internal/ports"
2728
"github.com/pgEdge/control-plane/server/internal/resource"
@@ -95,6 +96,7 @@ func newRootCmd(i *do.Injector) *cobra.Command {
9596
filesystem.RegisterResourceTypes(registry)
9697
monitor.RegisterResourceTypes(registry)
9798
scheduler.RegisterResourceTypes(registry)
99+
common.RegisterResourceTypes(registry)
98100
swarm.RegisterResourceTypes(registry)
99101

100102
if err := orchestrator.Provide(i); err != nil {

server/internal/database/orchestrator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ type Orchestrator interface {
172172
GetInstanceConnectionInfo(ctx context.Context, databaseID, instanceID string) (*ConnectionInfo, error)
173173
GetServiceInstanceStatus(ctx context.Context, serviceInstanceID string) (*ServiceInstanceStatus, error)
174174
CreatePgBackRestBackup(ctx context.Context, w io.Writer, instanceID string, options *pgbackrest.BackupOptions) error
175+
ExecuteInstanceCommand(ctx context.Context, w io.Writer, databaseID, instanceID string, args ...string) error
175176
ValidateInstanceSpecs(ctx context.Context, changes []*InstanceSpecChange) ([]*ValidationResult, error)
176177
StopInstance(ctx context.Context, instanceID string) error
177178
StartInstance(ctx context.Context, instanceID string) error
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
package common
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"path/filepath"
8+
9+
"github.com/samber/do"
10+
"github.com/spf13/afero"
11+
clientv3 "go.etcd.io/etcd/client/v3"
12+
13+
"github.com/pgEdge/control-plane/server/internal/certificates"
14+
"github.com/pgEdge/control-plane/server/internal/etcd"
15+
"github.com/pgEdge/control-plane/server/internal/filesystem"
16+
"github.com/pgEdge/control-plane/server/internal/patroni"
17+
"github.com/pgEdge/control-plane/server/internal/resource"
18+
)
19+
20+
const (
21+
etcdCaCertName = "ca.crt"
22+
etcdClientCertName = "client.crt"
23+
etcdClientKeyName = "client.key"
24+
)
25+
26+
var _ resource.Resource = (*EtcdCreds)(nil)
27+
28+
const ResourceTypeEtcdCreds resource.Type = "common.etcd_creds"
29+
30+
func EtcdCredsIdentifier(instanceID string) resource.Identifier {
31+
return resource.Identifier{
32+
ID: instanceID,
33+
Type: ResourceTypeEtcdCreds,
34+
}
35+
}
36+
37+
type EtcdCreds struct {
38+
InstanceID string `json:"instance_id"`
39+
DatabaseID string `json:"database_id"`
40+
HostID string `json:"host_id"`
41+
NodeName string `json:"node_name"`
42+
ParentID string `json:"parent_id"`
43+
OwnerUID int `json:"owner_uid"`
44+
OwnerGID int `json:"owner_gid"`
45+
Username string `json:"username"`
46+
Password string `json:"password"`
47+
CaCert []byte `json:"ca_cert"`
48+
ClientCert []byte `json:"client_cert"`
49+
ClientKey []byte `json:"client_key"`
50+
}
51+
52+
func (c *EtcdCreds) ResourceVersion() string {
53+
return "1"
54+
}
55+
56+
func (c *EtcdCreds) DiffIgnore() []string {
57+
return []string{
58+
"/username",
59+
"/password",
60+
"/ca_cert",
61+
"/client_cert",
62+
"/client_key",
63+
}
64+
}
65+
66+
func (c *EtcdCreds) Executor() resource.Executor {
67+
return resource.HostExecutor(c.HostID)
68+
}
69+
70+
func (c *EtcdCreds) Identifier() resource.Identifier {
71+
return EtcdCredsIdentifier(c.InstanceID)
72+
}
73+
74+
func (c *EtcdCreds) Dependencies() []resource.Identifier {
75+
return []resource.Identifier{
76+
filesystem.DirResourceIdentifier(c.ParentID),
77+
}
78+
}
79+
80+
func (c *EtcdCreds) TypeDependencies() []resource.Type {
81+
return nil
82+
}
83+
84+
func (c *EtcdCreds) Refresh(ctx context.Context, rc *resource.Context) error {
85+
fs, err := do.Invoke[afero.Fs](rc.Injector)
86+
if err != nil {
87+
return err
88+
}
89+
90+
parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID)
91+
if err != nil {
92+
return fmt.Errorf("failed to get parent full path: %w", err)
93+
}
94+
certsDir := filepath.Join(parentFullPath, "etcd")
95+
96+
caCert, err := ReadResourceFile(fs, filepath.Join(certsDir, etcdCaCertName))
97+
if err != nil {
98+
return fmt.Errorf("failed to read CA cert: %w", err)
99+
}
100+
clientCert, err := ReadResourceFile(fs, filepath.Join(certsDir, etcdClientCertName))
101+
if err != nil {
102+
return fmt.Errorf("failed to read client cert: %w", err)
103+
}
104+
clientKey, err := ReadResourceFile(fs, filepath.Join(certsDir, etcdClientKeyName))
105+
if err != nil {
106+
return fmt.Errorf("failed to read client key: %w", err)
107+
}
108+
109+
c.CaCert = caCert
110+
c.ClientCert = clientCert
111+
c.ClientKey = clientKey
112+
113+
return nil
114+
}
115+
116+
func (c *EtcdCreds) Create(ctx context.Context, rc *resource.Context) error {
117+
fs, err := do.Invoke[afero.Fs](rc.Injector)
118+
if err != nil {
119+
return err
120+
}
121+
certService, err := do.Invoke[*certificates.Service](rc.Injector)
122+
if err != nil {
123+
return err
124+
}
125+
etcdClient, err := do.Invoke[*clientv3.Client](rc.Injector)
126+
if err != nil {
127+
return err
128+
}
129+
130+
parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID)
131+
if err != nil {
132+
return fmt.Errorf("failed to get parent full path: %w", err)
133+
}
134+
certsDir := filepath.Join(parentFullPath, "etcd")
135+
136+
etcdCreds, err := etcd.CreateInstanceEtcdUser(ctx,
137+
etcdClient,
138+
certService,
139+
etcd.InstanceUserOptions{
140+
InstanceID: c.InstanceID,
141+
KeyPrefix: patroni.ClusterPrefix(c.DatabaseID, c.NodeName),
142+
Password: c.Password,
143+
},
144+
)
145+
if err != nil {
146+
return fmt.Errorf("failed to create etcd user: %w", err)
147+
}
148+
149+
c.Username = etcdCreds.Username
150+
c.Password = etcdCreds.Password
151+
c.CaCert = etcdCreds.CaCert
152+
c.ClientCert = etcdCreds.ClientCert
153+
c.ClientKey = etcdCreds.ClientKey
154+
155+
if err := fs.MkdirAll(certsDir, 0o700); err != nil {
156+
return fmt.Errorf("failed to create etcd certificates directory: %w", err)
157+
}
158+
if err := fs.Chown(certsDir, c.OwnerUID, c.OwnerGID); err != nil {
159+
return fmt.Errorf("failed to change ownership for certificates directory: %w", err)
160+
}
161+
162+
files := map[string][]byte{
163+
etcdCaCertName: c.CaCert,
164+
etcdClientCertName: c.ClientCert,
165+
etcdClientKeyName: c.ClientKey,
166+
}
167+
168+
for name, content := range files {
169+
if err := afero.WriteFile(fs, filepath.Join(certsDir, name), content, 0o600); err != nil {
170+
return fmt.Errorf("failed to write %s: %w", name, err)
171+
}
172+
if err := fs.Chown(filepath.Join(certsDir, name), c.OwnerUID, c.OwnerGID); err != nil {
173+
return fmt.Errorf("failed to change ownership for %s: %w", name, err)
174+
}
175+
}
176+
177+
return nil
178+
}
179+
180+
func (c *EtcdCreds) Update(ctx context.Context, rc *resource.Context) error {
181+
return c.Create(ctx, rc)
182+
}
183+
184+
func (c *EtcdCreds) Delete(ctx context.Context, rc *resource.Context) error {
185+
fs, err := do.Invoke[afero.Fs](rc.Injector)
186+
if err != nil {
187+
return err
188+
}
189+
certService, err := do.Invoke[*certificates.Service](rc.Injector)
190+
if err != nil {
191+
return err
192+
}
193+
etcdClient, err := do.Invoke[*clientv3.Client](rc.Injector)
194+
if err != nil {
195+
return err
196+
}
197+
198+
parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID)
199+
if err != nil {
200+
return fmt.Errorf("failed to get parent full path: %w", err)
201+
}
202+
etcdDir := filepath.Join(parentFullPath, "etcd")
203+
204+
if err := fs.RemoveAll(etcdDir); err != nil {
205+
return fmt.Errorf("failed to remove certificates directory: %w", err)
206+
}
207+
if err := etcd.RemoveInstanceEtcdUser(ctx, etcdClient, certService, c.InstanceID); err != nil {
208+
return fmt.Errorf("failed to delete etcd user: %w", err)
209+
}
210+
211+
return nil
212+
}
213+
214+
func ReadResourceFile(fs afero.Fs, path string) ([]byte, error) {
215+
contents, err := afero.ReadFile(fs, path)
216+
if errors.Is(err, afero.ErrFileNotFound) {
217+
return nil, resource.ErrNotFound
218+
} else if err != nil {
219+
return nil, err
220+
}
221+
return contents, nil
222+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
name: storefront-n1-689qacsi
2+
namespace: /patroni/
3+
scope: storefront:n1
4+
log:
5+
type: json
6+
level: INFO
7+
static_fields:
8+
database_id: storefront
9+
instance_id: storefront-n1-689qacsi
10+
node_name: n1
11+
bootstrap:
12+
dcs:
13+
loop_wait: 10
14+
ttl: 30
15+
retry_timeout: 10
16+
postgresql:
17+
parameters:
18+
max_connections: 901
19+
max_replication_slots: 16
20+
max_wal_senders: 16
21+
max_worker_processes: 12
22+
track_commit_timestamp: "on"
23+
wal_level: logical
24+
ignore_slots:
25+
- plugin: spock_output
26+
initdb:
27+
- data-checksums
28+
etcd3:
29+
hosts:
30+
- i-0123456789abcdef.ec2.internal:2379
31+
protocol: https
32+
username: instance.storefront-n1-689qacsi
33+
password: password
34+
cacert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/etcd/ca.crt
35+
cert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/etcd/client.crt
36+
key: /var/lib/pgsql/storefront-n1-689qacsi/certificates/etcd/client.key
37+
postgresql:
38+
authentication:
39+
superuser:
40+
username: pgedge
41+
sslmode: verify-full
42+
sslkey: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/superuser.key
43+
sslcert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/superuser.crt
44+
sslrootcert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/ca.crt
45+
replication:
46+
username: patroni_replicator
47+
sslmode: verify-full
48+
sslkey: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/replication.key
49+
sslcert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/replication.crt
50+
sslrootcert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/ca.crt
51+
connect_address: storefront-n1-689qacsi.storefront-database:5432
52+
data_dir: /var/lib/pgsql/storefront-n1-689qacsi/data/pgdata
53+
listen: "*:5432"
54+
parameters:
55+
archive_command: /bin/true
56+
archive_mode: "on"
57+
autovacuum_max_workers: 3
58+
autovacuum_vacuum_cost_limit: 200
59+
autovacuum_work_mem: 262144
60+
checkpoint_completion_target: "0.9"
61+
checkpoint_timeout: 15min
62+
dynamic_shared_memory_type: posix
63+
effective_cache_size: 524288
64+
hot_standby_feedback: "on"
65+
lolor.node: 1
66+
maintenance_work_mem: 137518
67+
max_parallel_workers: 8
68+
password_encryption: md5
69+
shared_buffers: 262144
70+
shared_preload_libraries: pg_stat_statements,spock
71+
snowflake.node: 1
72+
spock.allow_ddl_from_functions: "on"
73+
spock.conflict_log_level: DEBUG
74+
spock.conflict_resolution: last_update_wins
75+
spock.enable_ddl_replication: "on"
76+
spock.include_ddl_repset: "on"
77+
spock.save_resolutions: "on"
78+
ssl: "on"
79+
ssl_ca_file: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/ca.crt
80+
ssl_cert_file: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/server.crt
81+
ssl_key_file: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/server.key
82+
track_io_timing: "on"
83+
wal_log_hints: "on"
84+
wal_sender_timeout: 5s
85+
pg_hba:
86+
- local all all trust
87+
- host all all 127.0.0.1/32 trust
88+
- host all all ::1/128 trust
89+
- local replication all trust
90+
- host replication all 127.0.0.1/32 trust
91+
- host replication all ::1/128 trust
92+
- hostssl all pgedge,patroni_replicator 10.10.0.2/32 cert clientcert=verify-full
93+
- hostssl replication pgedge,patroni_replicator 10.10.0.2/32 cert clientcert=verify-full
94+
- hostssl all pgedge,patroni_replicator 10.10.0.3/32 cert clientcert=verify-full
95+
- hostssl replication pgedge,patroni_replicator 10.10.0.3/32 cert clientcert=verify-full
96+
- hostssl all pgedge,patroni_replicator 10.10.0.4/32 cert clientcert=verify-full
97+
- hostssl replication pgedge,patroni_replicator 10.10.0.4/32 cert clientcert=verify-full
98+
- host all pgedge,patroni_replicator 0.0.0.0/0 reject
99+
- host all pgedge,patroni_replicator ::0/0 reject
100+
- host all all 0.0.0.0/0 md5
101+
use_pg_rewind: true
102+
remove_data_directory_on_rewind_failure: true
103+
remove_data_directory_on_diverged_timelines: true
104+
basebackup:
105+
- checkpoint: fast
106+
restapi:
107+
connect_address: storefront-n1-689qacsi.storefront-database:8888
108+
listen: 0.0.0.0:8888
109+
allowlist:
110+
- 10.10.0.2
111+
- 10.10.0.3
112+
- 10.10.0.4
113+
- 127.0.0.1
114+
- localhost
115+
watchdog:
116+
mode: "off"

0 commit comments

Comments
 (0)