-
Notifications
You must be signed in to change notification settings - Fork 7
Detect external bootc status changes via fsnotify (milestone 4d) #59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,17 +58,23 @@ type BootcNodeReconciler struct { | |
| stageDone chan event.GenericEvent | ||
| // rebootIssued tracks whether a reboot has been issued so classifyAction | ||
| // can distinguish the Staged→Rebooting. | ||
| rebootIssued bool | ||
| rebootIssued bool | ||
| StatusChanged chan event.GenericEvent | ||
| } | ||
|
|
||
| func (r *BootcNodeReconciler) SetupWithManager(mgr ctrl.Manager) error { | ||
| r.stageDone = make(chan event.GenericEvent, 1) | ||
|
|
||
| return ctrl.NewControllerManagedBy(mgr). | ||
| builder := ctrl.NewControllerManagedBy(mgr). | ||
| For(&bootcv1alpha1.BootcNode{}). | ||
| WatchesRawSource(source.Channel(r.stageDone, &handler.EnqueueRequestForObject{})). | ||
| Named("bootcnode"). | ||
| Complete(r) | ||
| Named("bootcnode") | ||
|
|
||
| if r.StatusChanged != nil { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When would this be nil? |
||
| builder = builder.WatchesRawSource(source.Channel(r.StatusChanged, &handler.EnqueueRequestForObject{})) | ||
| } | ||
|
|
||
| return builder.Complete(r) | ||
| } | ||
|
|
||
| func (r *BootcNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,135 @@ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| package daemon | ||
|
|
||
| import ( | ||
| "context" | ||
| "os" | ||
| "time" | ||
|
|
||
| "github.com/fsnotify/fsnotify" | ||
| "github.com/go-logr/logr" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| logf "sigs.k8s.io/controller-runtime/pkg/log" | ||
|
|
||
| bootcv1alpha1 "github.com/jlebon/bootc-operator/api/v1alpha1" | ||
| "sigs.k8s.io/controller-runtime/pkg/event" | ||
| ) | ||
|
|
||
| const ( | ||
| // ostree backend | ||
| DefaultPrimaryPath = "/proc/1/root/ostree/bootc" | ||
| // composefs backend | ||
| DefaultFallbackPath = "/proc/1/root/sysroot/state/deploy" | ||
| ) | ||
|
|
||
| type StatusWatcher struct { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the way I was thinking about this was more like if it were an "informer cache" for bootc status. So here we would maintain the deserialized status, and the daemon whenever it needed it, would query the status watcher and that either returns the in-memory object (if fsnotify is known healthy), or run The polling fallback still makes sense I guess to keep the BootcNode status up to date even if the daemon itself doesn't need to do any work. But ideally we only start polling once fsnotify fails? WDYT? |
||
| PollInterval time.Duration | ||
| PrimaryPath string | ||
| FallbackPath string | ||
| Events chan event.GenericEvent | ||
| NodeName string | ||
| Ready chan struct{} | ||
| } | ||
|
|
||
| func (w *StatusWatcher) Start(ctx context.Context) error { | ||
| log := logf.FromContext(ctx).WithName("status-watcher") | ||
|
|
||
| watchPath := w.resolveWatchPath() | ||
|
|
||
| fsWatcher := w.setupFsnotify(log, watchPath) | ||
|
|
||
| closeFsWatcher := func() { | ||
| if fsWatcher != nil { | ||
| _ = fsWatcher.Close() | ||
| fsWatcher = nil | ||
| } | ||
| } | ||
| defer closeFsWatcher() | ||
|
|
||
| if w.PollInterval <= 0 { | ||
| w.PollInterval = 5 * time.Minute | ||
| } | ||
|
|
||
| ticker := time.NewTicker(w.PollInterval) | ||
| defer ticker.Stop() | ||
|
|
||
| var evCh <-chan fsnotify.Event | ||
| var errCh <-chan error | ||
| if fsWatcher != nil { | ||
| evCh = fsWatcher.Events | ||
| errCh = fsWatcher.Errors | ||
| } | ||
|
|
||
| if w.Ready != nil { | ||
| close(w.Ready) | ||
| } | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return nil | ||
| case ev := <-evCh: | ||
| // bootc updates modify directory mtime, which inotify reports as IN_ATTRIB (Chmod). | ||
| if ev.Has(fsnotify.Chmod) { | ||
| log.V(1).Info("Detected bootc status change via fsnotify") | ||
| w.sendEvent() | ||
| } | ||
| // Tear down fsnotify so the loop continues with polling only. | ||
| // A broken inotify fd never delivers events again, so without this | ||
| // the watcher silently stops reacting to filesystem changes. | ||
| case err := <-errCh: | ||
| log.Error(err, "fsnotify error, degrading to polling only") | ||
| closeFsWatcher() | ||
| evCh = nil | ||
| errCh = nil | ||
| case <-ticker.C: | ||
| log.V(1).Info("Polling bootc status") | ||
| w.sendEvent() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (w *StatusWatcher) setupFsnotify(log logr.Logger, watchPath string) *fsnotify.Watcher { | ||
| if watchPath == "" { | ||
| log.Info("No bootc status path found, using polling only") | ||
| return nil | ||
| } | ||
|
|
||
| fsWatcher, err := fsnotify.NewWatcher() | ||
| if err != nil { | ||
| log.Error(err, "Failed to create fsnotify watcher, falling back to polling") | ||
| return nil | ||
| } | ||
|
|
||
| if err := fsWatcher.Add(watchPath); err != nil { | ||
| log.Error(err, "Failed to watch path, falling back to polling", "path", watchPath) | ||
| _ = fsWatcher.Close() | ||
| return nil | ||
| } | ||
|
|
||
| log.Info("Watching path for bootc status changes", "path", watchPath) | ||
| return fsWatcher | ||
| } | ||
|
|
||
| func (w *StatusWatcher) resolveWatchPath() string { | ||
| if _, err := os.Stat(w.PrimaryPath); err == nil { | ||
| return w.PrimaryPath | ||
| } | ||
| if _, err := os.Stat(w.FallbackPath); err == nil { | ||
| return w.FallbackPath | ||
| } | ||
| return "" | ||
| } | ||
|
|
||
| func (w *StatusWatcher) sendEvent() { | ||
| ev := event.GenericEvent{ | ||
| Object: &bootcv1alpha1.BootcNode{ | ||
| ObjectMeta: metav1.ObjectMeta{Name: w.NodeName}, | ||
| }, | ||
| } | ||
| select { | ||
| case w.Events <- ev: | ||
| default: | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,137 @@ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| package daemon | ||
|
|
||
| import ( | ||
| "context" | ||
| "os" | ||
| "path/filepath" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "sigs.k8s.io/controller-runtime/pkg/event" | ||
| ) | ||
|
|
||
| func startWatcher(t *testing.T, w *StatusWatcher) (done <-chan error, cancel context.CancelFunc) { | ||
| t.Helper() | ||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| ch := make(chan error, 1) | ||
| go func() { ch <- w.Start(ctx) }() | ||
| <-w.Ready | ||
| return ch, cancel | ||
| } | ||
|
|
||
| func TestWatcherEvents(t *testing.T) { | ||
| tests := []struct { | ||
| name string | ||
| mkPrimary bool | ||
| mkFallback bool | ||
| touchPrimary bool | ||
| touchFallback bool | ||
| pollInterval time.Duration | ||
| }{ | ||
| { | ||
| name: "Fsnotify", | ||
| mkPrimary: true, | ||
| touchPrimary: true, | ||
| pollInterval: 10 * time.Minute, | ||
| }, | ||
| { | ||
| name: "FallbackPath", | ||
| mkFallback: true, | ||
| touchFallback: true, | ||
| pollInterval: 10 * time.Minute, | ||
| }, | ||
| { | ||
| name: "PollOnly", | ||
| pollInterval: 200 * time.Millisecond, | ||
| }, | ||
| } | ||
|
|
||
| for _, tt := range tests { | ||
| t.Run(tt.name, func(t *testing.T) { | ||
| dir := t.TempDir() | ||
| primaryPath := filepath.Join(dir, "bootc") | ||
| fallbackPath := filepath.Join(dir, "deploy") | ||
|
|
||
| if tt.mkPrimary { | ||
| if err := os.Mkdir(primaryPath, 0o755); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| } | ||
| if tt.mkFallback { | ||
| if err := os.Mkdir(fallbackPath, 0o755); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| } | ||
|
|
||
| events := make(chan event.GenericEvent, 1) | ||
| w := &StatusWatcher{ | ||
| PollInterval: tt.pollInterval, | ||
| PrimaryPath: primaryPath, | ||
| FallbackPath: fallbackPath, | ||
| Events: events, | ||
| NodeName: "test-node", | ||
| Ready: make(chan struct{}), | ||
| } | ||
|
|
||
| done, cancel := startWatcher(t, w) | ||
| defer cancel() | ||
|
|
||
| now := time.Now() | ||
| if tt.touchPrimary { | ||
| if err := os.Chtimes(primaryPath, now, now); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| } | ||
| if tt.touchFallback { | ||
| if err := os.Chtimes(fallbackPath, now, now); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| } | ||
|
|
||
| select { | ||
| case ev := <-events: | ||
| if ev.Object.GetName() != "test-node" { | ||
| t.Errorf("expected node name test-node, got %s", ev.Object.GetName()) | ||
| } | ||
| case <-time.After(5 * time.Second): | ||
| t.Fatal("timed out waiting for event") | ||
| } | ||
|
|
||
| cancel() | ||
| if err := <-done; err != nil { | ||
| t.Fatalf("watcher returned error: %v", err) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func TestWatcherShutdown(t *testing.T) { | ||
| dir := t.TempDir() | ||
| watchDir := filepath.Join(dir, "bootc") | ||
| if err := os.Mkdir(watchDir, 0o755); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| w := &StatusWatcher{ | ||
| PollInterval: 10 * time.Minute, | ||
| PrimaryPath: watchDir, | ||
| FallbackPath: filepath.Join(dir, "nonexistent"), | ||
| Events: make(chan event.GenericEvent, 1), | ||
| NodeName: "test-node", | ||
| Ready: make(chan struct{}), | ||
| } | ||
|
|
||
| done, cancel := startWatcher(t, w) | ||
| cancel() | ||
|
|
||
| select { | ||
| case err := <-done: | ||
| if err != nil { | ||
| t.Fatalf("watcher returned error on shutdown: %v", err) | ||
| } | ||
| case <-time.After(5 * time.Second): | ||
| t.Fatal("timed out waiting for watcher to shut down") | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps
bootc-pool-interval?