mirror of
https://github.com/docker/compose.git
synced 2026-05-13 13:58:02 +00:00
Refactor watcher: per-path triggers and intersectPathMatcher
- Add multiNotify and watcher methods for grouped per-path watches - Centralize per-path multi-watcher behavior so the naive watcher flow stays the same - Add intersectPathMatcher implementing PathMatcher for shared roots - Remove unnecessary conditions in the naive watcher as per feedback Signed-off-by: ManManavadaria <manmanavadaria@gmail.com>
This commit is contained in:
parent
f635b3a609
commit
62a24caecd
5 changed files with 224 additions and 33 deletions
|
|
@ -198,10 +198,10 @@ func (s *composeService) watch(ctx context.Context, project *types.Project, opti
|
|||
eg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
var (
|
||||
rules []watchRule
|
||||
paths []string
|
||||
ignoreMatchers []watch.PathMatcher
|
||||
rules []watchRule
|
||||
watchRootIgnores map[string][]watch.PathMatcher // abs watch path -> per-trigger ignore matchers
|
||||
)
|
||||
watchRootIgnores = make(map[string][]watch.PathMatcher)
|
||||
for serviceName, service := range project.Services {
|
||||
config, err := loadDevelopmentConfig(service, project)
|
||||
if err != nil {
|
||||
|
|
@ -259,8 +259,13 @@ func (s *composeService) watch(ctx context.Context, project *types.Project, opti
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ignoreMatchers = append(ignoreMatchers, ignore)
|
||||
paths = append(paths, trigger.Path)
|
||||
|
||||
absPath, err := filepath.Abs(trigger.Path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
absPath = filepath.Clean(absPath)
|
||||
watchRootIgnores[absPath] = append(watchRootIgnores[absPath], ignore)
|
||||
}
|
||||
|
||||
serviceWatchRules, err := getWatchRules(config, service)
|
||||
|
|
@ -270,15 +275,25 @@ func (s *composeService) watch(ctx context.Context, project *types.Project, opti
|
|||
rules = append(rules, serviceWatchRules...)
|
||||
}
|
||||
|
||||
if len(paths) == 0 {
|
||||
if len(watchRootIgnores) == 0 {
|
||||
return nil, fmt.Errorf("none of the selected services is configured for watch, consider setting a 'develop' section")
|
||||
}
|
||||
|
||||
watcher, err := watch.NewWatcher(paths, watch.NewCompositeMatcher(ignoreMatchers...))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
watchers := make([]watch.Notify, 0, len(watchRootIgnores))
|
||||
for path, matchers := range watchRootIgnores {
|
||||
merged := watch.NewIntersectMatcher(matchers...)
|
||||
triggerWatcher, err := watch.NewWatcher([]string{path}, merged)
|
||||
if err != nil {
|
||||
for _, w := range watchers {
|
||||
_ = w.Close()
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
watchers = append(watchers, triggerWatcher)
|
||||
}
|
||||
|
||||
watcher := watch.NewMultiWatcher(watchers...)
|
||||
|
||||
err = watcher.Start()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
|
|
@ -17,11 +17,13 @@
|
|||
package watch
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var numberOfWatches = expvar.NewInt("watch.naive.numberOfWatches")
|
||||
|
|
@ -84,6 +86,83 @@ func NewWatcher(paths []string, ignore PathMatcher) (Notify, error) {
|
|||
return newWatcher(paths, ignore)
|
||||
}
|
||||
|
||||
type multiNotify struct {
|
||||
children []Notify
|
||||
events chan FileEvent
|
||||
errors chan error
|
||||
}
|
||||
|
||||
func NewMultiWatcher(children ...Notify) Notify {
|
||||
return &multiNotify{
|
||||
children: children,
|
||||
events: make(chan FileEvent),
|
||||
errors: make(chan error),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *multiNotify) Start() error {
|
||||
for i := range m.children {
|
||||
if err := m.children[i].Start(); err != nil {
|
||||
for j := 0; j < i; j++ {
|
||||
_ = m.children[j].Close()
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
var eventsWG sync.WaitGroup
|
||||
eventsWG.Add(len(m.children))
|
||||
for i := range m.children {
|
||||
child := m.children[i]
|
||||
go func() {
|
||||
defer eventsWG.Done()
|
||||
for e := range child.Events() {
|
||||
m.events <- e
|
||||
}
|
||||
}()
|
||||
}
|
||||
go func() {
|
||||
eventsWG.Wait()
|
||||
close(m.events)
|
||||
}()
|
||||
|
||||
var errorsWG sync.WaitGroup
|
||||
errorsWG.Add(len(m.children))
|
||||
for i := range m.children {
|
||||
child := m.children[i]
|
||||
go func() {
|
||||
defer errorsWG.Done()
|
||||
for err := range child.Errors() {
|
||||
m.errors <- err
|
||||
}
|
||||
}()
|
||||
}
|
||||
go func() {
|
||||
errorsWG.Wait()
|
||||
close(m.errors)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *multiNotify) Close() error {
|
||||
var errs []error
|
||||
for _, child := range m.children {
|
||||
if err := child.Close(); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
func (m *multiNotify) Events() chan FileEvent {
|
||||
return m.events
|
||||
}
|
||||
|
||||
func (m *multiNotify) Errors() chan error {
|
||||
return m.errors
|
||||
}
|
||||
|
||||
const WindowsBufferSizeEnvVar = "COMPOSE_WATCH_WINDOWS_BUFFER_SIZE"
|
||||
|
||||
const defaultBufferSize int = 65536
|
||||
|
|
@ -134,3 +213,48 @@ func (c CompositePathMatcher) MatchesEntireDir(f string) (bool, error) {
|
|||
}
|
||||
|
||||
var _ PathMatcher = CompositePathMatcher{}
|
||||
|
||||
// intersectPathMatcher matches iff every matcher matches. With several develop.watch
|
||||
// triggers on one watch root, skip/filter a path only when every trigger's ignores agree.
|
||||
type intersectPathMatcher struct {
|
||||
Matchers []PathMatcher
|
||||
}
|
||||
|
||||
// NewIntersectMatcher returns a PathMatcher that matches iff every matcher matches.
|
||||
func NewIntersectMatcher(matchers ...PathMatcher) PathMatcher {
|
||||
if len(matchers) == 0 {
|
||||
return EmptyMatcher{}
|
||||
}
|
||||
if len(matchers) == 1 {
|
||||
return matchers[0]
|
||||
}
|
||||
return intersectPathMatcher{Matchers: matchers}
|
||||
}
|
||||
|
||||
func (i intersectPathMatcher) Matches(f string) (bool, error) {
|
||||
for _, t := range i.Matchers {
|
||||
ret, err := t.Matches(f)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !ret {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (i intersectPathMatcher) MatchesEntireDir(f string) (bool, error) {
|
||||
for _, t := range i.Matchers {
|
||||
ret, err := t.MatchesEntireDir(f)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !ret {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
var _ PathMatcher = intersectPathMatcher{}
|
||||
|
|
|
|||
|
|
@ -50,6 +50,46 @@ func TestWindowsBufferSize(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestNewIntersectMatcher(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
|
||||
vendorOnly, err := DockerIgnoreTesterFromContents(root, "vendor/\n")
|
||||
assert.NilError(t, err)
|
||||
tmpOnly, err := DockerIgnoreTesterFromContents(root, "tmp/\n")
|
||||
assert.NilError(t, err)
|
||||
|
||||
inter := NewIntersectMatcher(vendorOnly, tmpOnly)
|
||||
|
||||
vendorFile := filepath.Join(root, "vendor", "a.go")
|
||||
matches, err := inter.Matches(vendorFile)
|
||||
assert.NilError(t, err)
|
||||
assert.Assert(t, !matches, "only one trigger ignores vendor; intersection must not treat path as ignored")
|
||||
|
||||
bothIgnoreBuild1, err := DockerIgnoreTesterFromContents(root, "build/\n")
|
||||
assert.NilError(t, err)
|
||||
bothIgnoreBuild2, err := DockerIgnoreTesterFromContents(root, "build/\n")
|
||||
assert.NilError(t, err)
|
||||
interBuild := NewIntersectMatcher(bothIgnoreBuild1, bothIgnoreBuild2)
|
||||
buildFile := filepath.Join(root, "build", "out")
|
||||
matches, err = interBuild.Matches(buildFile)
|
||||
assert.NilError(t, err)
|
||||
assert.Assert(t, matches)
|
||||
|
||||
dirEntire1, err := DockerIgnoreTesterFromContents(root, "cache/\n")
|
||||
assert.NilError(t, err)
|
||||
dirEntire2, err := DockerIgnoreTesterFromContents(root, "cache/\n")
|
||||
assert.NilError(t, err)
|
||||
interDir := NewIntersectMatcher(dirEntire1, dirEntire2)
|
||||
entire, err := interDir.MatchesEntireDir(filepath.Join(root, "cache"))
|
||||
assert.NilError(t, err)
|
||||
assert.Assert(t, entire)
|
||||
|
||||
partialEntire := NewIntersectMatcher(vendorOnly, tmpOnly)
|
||||
entire, err = partialEntire.MatchesEntireDir(filepath.Join(root, "vendor"))
|
||||
assert.NilError(t, err)
|
||||
assert.Assert(t, !entire, "must not skip whole dir unless every matcher agrees it is entirely ignorable")
|
||||
}
|
||||
|
||||
func TestNoEvents(t *testing.T) {
|
||||
f := newNotifyFixture(t)
|
||||
f.assertEvents()
|
||||
|
|
|
|||
|
|
@ -114,10 +114,6 @@ func (d *naiveNotify) watchRecursively(dir string) error {
|
|||
|
||||
return filepath.WalkDir(dir, func(path string, info fs.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
if os.IsPermission(err) && d.shouldIgnore(path) {
|
||||
logrus.Debugf("Ignoring path: %s", path)
|
||||
return filepath.SkipDir
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -185,10 +181,6 @@ func (d *naiveNotify) loop() { //nolint:gocyclo
|
|||
// TODO(dbentley): if there's a delete should we call d.watcher.Remove to prevent leaking?
|
||||
err := filepath.WalkDir(e.Name, func(path string, info fs.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
if os.IsPermission(err) && d.shouldIgnore(path) {
|
||||
logrus.Debugf("Ignoring path: %s", path)
|
||||
return filepath.SkipDir
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -228,6 +220,10 @@ func (d *naiveNotify) loop() { //nolint:gocyclo
|
|||
}
|
||||
|
||||
func (d *naiveNotify) shouldNotify(path string) bool {
|
||||
if d.shouldIgnore(path) {
|
||||
return false
|
||||
}
|
||||
|
||||
if _, ok := d.notifyList[path]; ok {
|
||||
// We generally don't care when directories change at the root of an ADD
|
||||
stat, err := os.Lstat(path)
|
||||
|
|
@ -249,21 +245,10 @@ func (d *naiveNotify) shouldSkipDir(path string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// Suppose we're watching
|
||||
// /src/.tiltignore
|
||||
// but the .tiltignore file doesn't exist.
|
||||
//
|
||||
// Our watcher will create an inotify watch on /src/.
|
||||
//
|
||||
// But then we want to make sure we don't recurse from /src/ down to /src/node_modules.
|
||||
//
|
||||
// To handle this case, we only want to traverse dirs that are:
|
||||
// - A child of a directory that's in our notify list, or
|
||||
// - A parent of a directory that's in our notify list
|
||||
// (i.e., to cover the "path doesn't exist" case).
|
||||
//
|
||||
// We prioritize "parent of watched path" checks before ignore checks so
|
||||
// one trigger's ignore rules can't hide another trigger's nested watch root.
|
||||
// Only walk directories that are under a notifyList path, or that are under an ancestor of one
|
||||
// (Start() may watch a parent when the target is missing or is a file).
|
||||
// Check ancestor/descendant vs notifyList before ignores
|
||||
// so ignore patterns cannot block reaching the watched root.
|
||||
isChildOfWatchedDir := false
|
||||
for root := range d.notifyList {
|
||||
if pathutil.IsChild(path, root) {
|
||||
|
|
@ -294,7 +279,7 @@ func (d *naiveNotify) shouldIgnoreEntireDir(path string) bool {
|
|||
if matches {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
return matches
|
||||
}
|
||||
|
||||
func (d *naiveNotify) shouldIgnore(path string) bool {
|
||||
|
|
|
|||
|
|
@ -211,3 +211,30 @@ func TestShouldSkipDirDoesNotSkipAncestorOfWatchedPath(t *testing.T) {
|
|||
parent := filepath.Join(repoRoot, "parent")
|
||||
assert.Assert(t, !d.shouldSkipDir(parent), "expected parent directory to remain traversable when it contains a watched path")
|
||||
}
|
||||
|
||||
func TestShouldSkipDirIntersectRequiresAllTriggersToAgree(t *testing.T) {
|
||||
repoRoot := t.TempDir()
|
||||
ignoreVendor, err := DockerIgnoreTesterFromContents(repoRoot, "vendor/\n")
|
||||
assert.NilError(t, err)
|
||||
ignoreTmp, err := DockerIgnoreTesterFromContents(repoRoot, "tmp/\n")
|
||||
assert.NilError(t, err)
|
||||
|
||||
d := &naiveNotify{
|
||||
ignore: NewIntersectMatcher(ignoreVendor, ignoreTmp),
|
||||
notifyList: map[string]bool{repoRoot: true},
|
||||
}
|
||||
|
||||
vendorDir := filepath.Join(repoRoot, "vendor")
|
||||
assert.Assert(t, !d.shouldSkipDir(vendorDir), "vendor must remain watched when another trigger does not ignore it")
|
||||
|
||||
ignoreBuild1, err := DockerIgnoreTesterFromContents(repoRoot, "build/\n")
|
||||
assert.NilError(t, err)
|
||||
ignoreBuild2, err := DockerIgnoreTesterFromContents(repoRoot, "build/\n")
|
||||
assert.NilError(t, err)
|
||||
d2 := &naiveNotify{
|
||||
ignore: NewIntersectMatcher(ignoreBuild1, ignoreBuild2),
|
||||
notifyList: map[string]bool{repoRoot: true},
|
||||
}
|
||||
buildDir := filepath.Join(repoRoot, "build")
|
||||
assert.Assert(t, d2.shouldSkipDir(buildDir), "when every trigger ignores the same subtree, watcher may skip it")
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue