Skip to content

Commit

Permalink
Polishing cluster (#146)
Browse files Browse the repository at this point in the history
* Polishing cluster

* test selectMemberFunc
  • Loading branch information
anthdm committed Jan 14, 2024
1 parent d86dd35 commit 5a41e08
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 125 deletions.
67 changes: 67 additions & 0 deletions cluster/activation.go
Original file line number Diff line number Diff line change
@@ -0,0 1,67 @@
package cluster

import (
fmt "fmt"
"math"
"math/rand"
)

// ActivationConfig...
type ActivationConfig struct {
id string
region string
selectMember SelectMemberFunc
}

// NewActivationConfig returns a new default config.
func NewActivationConfig() ActivationConfig {
return ActivationConfig{
id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)),
region: "default",
selectMember: SelectRandomMember,
}
}

// WithSelectMemberFunc set's the fuction that will be invoked during
// the activation process.
// It will select the member where the actor will be activated/spawned on.
func (config ActivationConfig) WithSelectMemberFunc(fun SelectMemberFunc) ActivationConfig {
config.selectMember = fun
return config
}

// WithID set's the id of the actor that will be activated on the cluster.
//
// Defaults to a random identifier.
func (config ActivationConfig) WithID(id string) ActivationConfig {
config.id = id
return config
}

// WithRegion set's the region on where this actor should be spawned.
//
// Defaults to a "default".
func (config ActivationConfig) WithRegion(region string) ActivationConfig {
config.region = region
return config
}

// SelectMemberFunc will be invoked during the activation process.
// Given the ActivationDetails the actor will be spawned on the returned member.
type SelectMemberFunc func(ActivationDetails) *Member

// ActivationDetails holds detailed information about an activation.
type ActivationDetails struct {
// Region where the actor should be activated on
Region string
// A slice of members that are pre-filtered by the kind of the actor
// that need to be activated
Members []*Member
// The kind of the actor
Kind string
}

// SelectRandomMember selects a random member of the cluster.
func SelectRandomMember(details ActivationDetails) *Member {
return details.Members[rand.Intn(len(details.Members))]
}
35 changes: 0 additions & 35 deletions cluster/activator.go

This file was deleted.

22 changes: 12 additions & 10 deletions cluster/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 11,7 @@ import (
type (
activate struct {
kind string
id string
region string
config ActivationConfig
}
getMembers struct{}
getKinds struct{}
Expand Down Expand Up @@ -60,7 59,7 @@ func (a *Agent) Receive(c *actor.Context) {
case *Activation:
a.handleActivation(msg)
case activate:
pid := a.activate(msg.kind, msg.id, msg.region)
pid := a.activate(msg.kind, msg.config)
c.Respond(pid)
case deactivate:
a.bcast(&Deactivation{PID: msg.pid})
Expand Down Expand Up @@ -117,27 116,30 @@ func (a *Agent) handleActivationRequest(msg *ActivationRequest) *ActivationRespo
return resp
}

func (a *Agent) activate(kind, id, region string) *actor.PID {
func (a *Agent) activate(kind string, config ActivationConfig) *actor.PID {
members := a.members.FilterByKind(kind)
if len(members) == 0 {
slog.Warn("could not find any members with kind", "kind", kind)
return nil
}
owner := a.cluster.config.activationStrategy.ActivateOnMember(ActivationDetails{
if config.selectMember == nil {
config.selectMember = SelectRandomMember
}
memberPID := config.selectMember(ActivationDetails{
Members: members,
Region: region,
Region: config.region,
Kind: kind,
})
if owner == nil {
if memberPID == nil {
slog.Warn("activator did not found a member to activate on")
return nil
}
req := &ActivationRequest{Kind: kind, ID: id}
activatorPID := actor.NewPID(owner.Host, "cluster/" owner.ID)
req := &ActivationRequest{Kind: kind, ID: config.id}
activatorPID := actor.NewPID(memberPID.Host, "cluster/" memberPID.ID)

var activationResp *ActivationResponse
// Local activation
if owner.Host == a.cluster.engine.Address() {
if memberPID.Host == a.cluster.engine.Address() {
activationResp = a.handleActivationRequest(req)
} else {
// Remote activation
Expand Down
91 changes: 26 additions & 65 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 22,22 @@ type Producer func(c *Cluster) actor.Producer

// Config holds the cluster configuration
type Config struct {
listenAddr string
id string
region string
activationStrategy ActivationStrategy
engine *actor.Engine
provider Producer
requestTimeout time.Duration
listenAddr string
id string
region string
engine *actor.Engine
provider Producer
requestTimeout time.Duration
}

// NewConfig returns a Config that is initialized with default values.
func NewConfig() Config {
return Config{
listenAddr: getRandomListenAddr(),
id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)),
region: "default",
activationStrategy: NewDefaultActivationStrategy(),
provider: NewSelfManagedProvider(NewSelfManagedConfig()),
requestTimeout: defaultRequestTimeout,
listenAddr: getRandomListenAddr(),
id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)),
region: "default",
provider: NewSelfManagedProvider(NewSelfManagedConfig()),
requestTimeout: defaultRequestTimeout,
}
}

Expand Down Expand Up @@ -71,14 69,6 @@ func (config Config) WithEngine(e *actor.Engine) Config {
return config
}

// TODO: Still not convinced about the name "ActivationStrategy".
// TODO: Document this more.
// WithActivationStrategy
func (config Config) WithActivationStrategy(s ActivationStrategy) Config {
config.activationStrategy = s
return config
}

// WithListenAddr set's the listen address of the underlying remote.
//
// Defaults to a random port number.
Expand Down Expand Up @@ -160,42 150,15 @@ func (c *Cluster) Spawn(p actor.Producer, id string, opts ...actor.OptFunc) *act
return pid
}

type ActivationConfig struct {
id string
region string
}

// NewActivationConfig returns a new default config.
func NewActivationConfig() ActivationConfig {
return ActivationConfig{
id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)),
region: "default",
}
}

// WithID set's the id of the actor that will be activated on the cluster.
// Activate actives the registered kind in the cluster based on the given config.
// The actor does not need to be registered locally on the member if at least one
// member has that kind registered.
//
// Defaults to a random identifier.
func (config ActivationConfig) WithID(id string) ActivationConfig {
config.id = id
return config
}

// WithRegion set's the region on where this actor (potentially) will be spawned
//
// Defaults to a "default".
func (config ActivationConfig) WithRegion(region string) ActivationConfig {
config.region = region
return config
}

// Activate actives the given actor kind with an optional id. If there is no id
// given, the engine will create an unique id automatically.
// playerPID := cluster.Activate("player", cluster.NewActivationConfig())
func (c *Cluster) Activate(kind string, config ActivationConfig) *actor.PID {
msg := activate{
kind: kind,
id: config.id,
region: config.region,
config: config,
}
resp, err := c.engine.Request(c.agentPID, msg, c.config.requestTimeout).Result()
if err != nil {
Expand All @@ -215,23 178,21 @@ func (c *Cluster) Deactivate(pid *actor.PID) {
c.engine.Send(c.agentPID, deactivate{pid: pid})
}

// RegisterKind registers a new actor/receiver kind that can be spawned from any node
// on the cluster.
// NOTE: Kinds can only be registered if the cluster is not running.
func (c *Cluster) RegisterKind(name string, producer actor.Producer, config *KindConfig) {
// RegisterKind registers a new actor that can be activated from any member
// in the cluster.
//
// cluster.Register("player", NewPlayer, NewKindConfig())
//
// NOTE: Kinds can only be registered before the cluster is started.
func (c *Cluster) RegisterKind(kind string, producer actor.Producer, config KindConfig) {
if c.isStarted {
slog.Warn("trying to register new kind on a running cluster")
slog.Warn("failed to register kind", "reason", "cluster already started", "kind", kind)
return
}
if config == nil {
config = &KindConfig{}
}
kind := newKind(name, producer, *config)
c.kinds = append(c.kinds, kind)
c.kinds = append(c.kinds, newKind(kind, producer, config))
}

// HasLocalKind returns true if this members of the cluster has the kind
// locally registered.
// HasKindLocal returns true whether the node of the cluster has the kind locally registered.
func (c *Cluster) HasKindLocal(name string) bool {
for _, kind := range c.kinds {
if kind.name == name {
Expand Down
Loading

0 comments on commit 5a41e08

Please sign in to comment.