-
Notifications
You must be signed in to change notification settings - Fork 407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement multiplexer proxy. #2141
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2141 /- ##
===========================================
- Coverage 58.93% 45.28% -13.65%
===========================================
Files 210 420 210
Lines 18968 28631 9663
===========================================
Hits 11179 12966 1787
- Misses 6707 14367 7660
- Partials 1082 1298 216
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
44acb84
to
8ecdab1
Compare
Quality Gate passedIssues Measures |
4171c61
to
aed7925
Compare
4f1ad5f
to
d6c4a47
Compare
pkg/yurthub/util/util.go
Outdated
@@ -86,13 86,15 @@ const ( | |||
CacheUserAgentsKey = "cache_agents" | |||
PoolScopeResourcesKey = "pool_scope_resources" | |||
|
|||
MultiplexerProxyClientUserAgent = "multiplexer-proxy" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about name MultiplexerProxyClientUserAgent
to fmt.Sprintf("multiplexer-proxy-%s", strings.ToLower(nodeName))?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
func (sm *storageManager) restClient(gvr *schema.GroupVersionResource) (rest.Interface, error) { | ||
httpClient, _ := rest.HTTPClientFor(sm.config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not skip err here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return nil, errors.Wrapf(err, "failed to get rest client for %v", gvr) | ||
} | ||
|
||
rs := &store{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rs := NewStore(restClient, gvr.Resource)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
a2f0237
to
a3d603c
Compare
cmd/yurthub/app/config/config.go
Outdated
"github.com/openyurtio/openyurt/pkg/yurthub/network" | ||
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" | ||
"github.com/openyurtio/openyurt/pkg/yurthub/util" | ||
) | ||
|
||
var DefaultMultiplexerResources = []schema.GroupVersionResource{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DefaultMultiplexerResources --> AllowedMultiplexerResources
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
cmd/yurthub/app/config/config.go
Outdated
@@ -101,6 117,9 @@ type YurtHubConfiguration struct { | |||
CoordinatorClient kubernetes.Interface | |||
LeaderElection componentbaseconfig.LeaderElectionConfiguration | |||
HostControlPlaneAddr string // ip:port | |||
PostStartHooks map[string]func() error | |||
MultiplexerCacheManager multiplexer.MultiplexerManager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MultiplexerCacheManager --> RequestMultiplexerManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
type NodeGetter func(name string) (*v1.Node, error) | ||
|
||
type UnionObjectFilter []ObjectFilter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we can use filterChain
instead of defining a new UnionObjectFilter.
type filterChain []filter.ObjectFilter |
by the way, maybe you can create a new folder named objectfilter and move filterchain
into this folder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zyjhtangtang please remove UnionObjectFilter
Quality Gate passedIssues Measures |
@@ -59,4 59,11 @@ type ObjectFilter interface { | |||
Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object | |||
} | |||
|
|||
type FilterManager interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about rename FilterManager to FilterFinder?
|
||
if info.Verb != "list" && info.Verb != "watch" { | ||
return false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we also need to exclude requests with fieldSelector or labelSelector.
sp.multiplexerList(w, r, gvr) | ||
case "watch": | ||
sp.multiplexerWatch(w, r, gvr) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add default
case, like only support list/watch requests
.
|
||
type storageManager struct { | ||
config *rest.Config | ||
storageMap map[string]storage.Interface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
storageMap --> gvrToStorage
restStoreManager: restStoreManager, | ||
restMapper: kmeta.NewDefaultRESTMapperFromScheme(), | ||
cacheMap: make(map[string]Interface), | ||
cacheConfigMap: make(map[string]*ResourceCacheConfig), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cacheConfigMap --> gvrToCacheConfig
return &multiplexerManager{ | ||
restStoreManager: restStoreManager, | ||
restMapper: kmeta.NewDefaultRESTMapperFromScheme(), | ||
cacheMap: make(map[string]Interface), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cacheMap --> gvrToCache
restMapper: kmeta.NewDefaultRESTMapperFromScheme(), | ||
cacheMap: make(map[string]Interface), | ||
cacheConfigMap: make(map[string]*ResourceCacheConfig), | ||
cacheDestroyFuncMap: make(map[string]func()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cacheDestroyFuncMap --> gvrToCacheDestroyFunc
} | ||
} | ||
func (m *multiplexerManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) { | ||
if config, ok := m.cacheConfigMap[gvr.String()]; ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to add a sync lock for cacheConfigMap in order to get rid of race conditioning.
} | ||
|
||
func (sp *multiplexerProxy) filterListObject(obj runtime.Object, filter filter.ObjectFilter) (runtime.Object, error) { | ||
if filter == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filter may not be nil because of it is a filter.ObjectFilter. so please use yurtutil.IsNil()
func to verify.
What type of PR is this?
What this PR does / why we need it:
Implement the shared cache module in node-level traffic multiplexing, referencing the Design Proposal.
Does this PR introduce a user-facing change?
"NONE"