Skip to content

Commit

Permalink
Simplify msgs, refactoring phase2prepare
Browse files Browse the repository at this point in the history
  • Loading branch information
r0qs committed Jun 26, 2016
1 parent 703e528 commit 024f747
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 14 deletions.
3 changes: 2 additions & 1 deletion src/main/scala/cfabcast/agents/Acceptor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 22,7 @@ trait Acceptor extends ActorLogging {
if ((!msg.value.get.isEmpty && oldState.vrnd < msg.rnd) || oldState.vval == None) {
val newState = oldState.copy(rnd = msg.rnd, vrnd = msg.rnd, vval = msg.value)
config.learners.values foreach (_ ! Msg2B(id, msg.instance, newState.rnd, newState.vval))
log.info("INSTANCE: {} - ROUND: {} - PHASE2B - {} accept COORDINATOR VALUE: {}", msg.instance, msg.rnd, id, newState.vval)
newState
} else {
oldState
Expand All @@ -47,7 48,7 @@ trait Acceptor extends ActorLogging {
}
val newState = oldState.copy(rnd = msg.rnd, vrnd = msg.rnd, vval = Some(value))
config.learners.values foreach (_ ! Msg2B(id, msg.instance, msg.rnd, newState.vval))
log.debug("INSTANCE: {} - ROUND: {} - PHASE2B - {} accept VALUE: {}", msg.instance, msg.rnd, id, newState.vval)
log.info("INSTANCE: {} - ROUND: {} - PHASE2B - {} accept PROPOSED VALUE: {}", msg.instance, msg.rnd, id, newState.vval)
newState
} else {
log.warning("INSTANCE: {} - {} PHASE2B2 message round: {} < state rnd: {}", msg.instance, id, msg.rnd, oldState.rnd)
Expand Down
24 changes: 11 additions & 13 deletions src/main/scala/cfabcast/agents/Proposer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 40,7 @@ trait Proposer extends ActorLogging {
if ((isCFProposerOf(msg.rnd) && prnd == msg.rnd && oldState.pval == None) && msg.value.get(msg.senderId) != Nil) {
// Phase 2A for CFProposers
val newState = oldState.copy(pval = msg.value)
log.info("INSTANCE: {} - ROUND: {} - PROPOSE ({})- VALUE: {}", msg.instance, msg.rnd, id, newState.pval)
((msg.rnd.cfproposers diff Set(self)) union config.acceptors.values.toSet).foreach(_ ! Msg2A(id, msg.instance, msg.rnd, msg.value))
newState
} else {
Expand Down Expand Up @@ -101,20 102,16 @@ trait Proposer extends ActorLogging {

def phase2Prepare(msg: Msg2S, state: Future[ProposerMeta], config: ClusterConfiguration)(implicit ec: ExecutionContext): Future[ProposerMeta] = async {
val oldState = await(state)
// TODO: verify if sender is a coordinatior, how? i don't really know yet
// FIXME: <= ???
// proposer3 - INSTANCE: 0 - PHASE2PREPARE - p3 not update pval, because prnd: < 2; -1394923365; -1326777805 , -1394923365 > is not lower than message ROUND: < 2; -1394923365; -1326777805 , -1394923365 >
// ROUND BUG: proposer3 - INSTANCE: 0 - PHASE2PREPARE - p3 not update pval, because prnd: < 2; -1394923365; -1326777805 , -1394923365 > is not lower than message ROUND: < 2; -1394923365; -1326777805 , -1394923365 >
if(prnd < msg.rnd) {
//FIXME: improve this update
self ! UpdatePRound(msg.rnd, crnd)
log.info("Proposer {} READY in ROUND: {}", id, msg.rnd)
if(msg.value.get.isEmpty) {
val newState = oldState.copy(pval = None)
// TODO: improve round updates!!!!
// maybe a future pipeTo self is better option
self ! UpdatePRound(msg.rnd, crnd)
newState
oldState.copy(pval = None)
} else {
val newState = oldState.copy(pval = msg.value)
self ! UpdatePRound(msg.rnd, crnd)
newState
oldState.copy(pval = msg.value)
}
} else {
log.debug("INSTANCE: {} - PHASE2PREPARE - {} not update pval, because prnd: {} is greater than message ROUND: {}", msg.instance, id, prnd, msg.rnd)
Expand Down Expand Up @@ -156,7 153,7 @@ trait Proposer extends ActorLogging {
} else if (proposed <= greatestInstance) {
proposed = greatestInstance 1
}
log.info("Proposer: {} -> DECIDED= {} , PROPOSED= {}", id, learnedInstances, proposed)
log.debug("Proposer: {} -> DECIDED= {} , PROPOSED= {}", id, learnedInstances, proposed)
// If not proposed and not learned nothing yet in this instance
val vmap: Option[VMap[AgentId, Values]] = Some(VMap(id -> value))
var instance = learnedInstances.next
Expand Down Expand Up @@ -222,6 219,7 @@ trait Proposer extends ActorLogging {
}
} else {
log.error("INSTANCE: {} - Received a vmap {} that not contains me: {} send by {}", msg.instance, vmap, id, learner)
context.stop(self)
}
}

Expand All @@ -241,7 239,7 @@ trait Proposer extends ActorLogging {
//prnd = prnd.copy(coordinator = rnd.coordinator)
coordinators = msg.coordinators
if (msg.coordinators contains self) {
log.debug("Iam a LEADER! My id is: {} - HASHCODE: {}", id, self.hashCode)
log.info("Iam a LEADER! My id is: {} - HASHCODE: {}", id, self.hashCode)
// Run configure phase (1)
implicit val timeout = Timeout(3 seconds)
val cfpSet: Future[Set[ActorRef]] = ask(context.parent, GetCFPs).mapTo[Set[ActorRef]]
Expand All @@ -267,7 265,7 @@ trait Proposer extends ActorLogging {
retry(self, msg)
}
} else {
log.debug("Iam NOT the LEADER! My id is {} - {}", id, self)
log.info("Iam NOT the LEADER! My id is {} - {}", id, self)
}

case msg: UpdateRound =>
Expand Down

0 comments on commit 024f747

Please sign in to comment.