Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leader redirect #176

Merged
merged 4 commits into from
Oct 3, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 93 additions & 31 deletions api/apihttp/apihttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 20,7 @@ package apihttp
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"time"

Expand All @@ -28,6 29,7 @@ import (
"github.com/bbva/qed/crypto/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/hashicorp/raft"
)

type ClientApi interface {
Expand All @@ -40,6 42,7 @@ type ClientApi interface {
QueryConsistency(start, end uint64) (*balloon.IncrementalProof, error)
ClusterInfo() *consensus.ClusterInfo
Info() *consensus.NodeInfo
IsLeader() bool
}

// HealthCheckResponse contains the response from HealthCheckHandler.
Expand Down Expand Up @@ -145,13 148,39 @@ func Add(api ClientApi) http.HandlerFunc {

// Wait for the response
response, err := api.Add(event.Event)
if err != nil {
switch err {
case nil:
break
case raft.ErrNotLeader:
fallthrough
case raft.ErrLeadershipLost:
var scheme protocol.Scheme
if r.TLS != nil {
scheme = protocol.Https
} else {
scheme = protocol.Http
}

shards, err := getShards(api, scheme)
if err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
gdiazlo marked this conversation as resolved.
Show resolved Hide resolved
return
}
out, err := json.Marshal(shards)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write(out)
http.Redirect(w, r, shards.Shards[shards.LeaderId].HTTPAddr, http.StatusMovedPermanently)
return
default:
http.Error(w, err.Error(), http.StatusPreconditionFailed)
return
}

snapshot := protocol.Snapshot(*response)

out, err := json.Marshal(&snapshot)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -206,8 235,34 @@ func AddBulk(api ClientApi) http.HandlerFunc {

// Wait for the response
snapshotBulk, err := api.AddBulk(eventBulk.Events)
switch err {
case nil:
break
case raft.ErrNotLeader:
fallthrough
case raft.ErrLeadershipLost:
var scheme protocol.Scheme
if r.TLS != nil {
scheme = protocol.Https
} else {
scheme = protocol.Http
}

if err != nil {
shards, err := getShards(api, scheme)
if err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
gdiazlo marked this conversation as resolved.
Show resolved Hide resolved
return
}
out, err := json.Marshal(shards)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write(out)
http.Redirect(w, r, shards.Shards[shards.LeaderId].HTTPAddr, http.StatusMovedPermanently)
return
default:
http.Error(w, err.Error(), http.StatusPreconditionFailed)
return
}
Expand Down Expand Up @@ -429,6 484,35 @@ func Incremental(api ClientApi) http.HandlerFunc {
// }
// }
// }

func getShards(api ClientApi, scheme protocol.Scheme) (*protocol.Shards, error) {
clusterInfo := api.ClusterInfo()
if clusterInfo.LeaderId == "" {
return nil, fmt.Errorf("Leader not found!")
}

if len(clusterInfo.Nodes) == 0 {
return nil, fmt.Errorf("Nodes not found")
}

nodeInfo := api.Info()
shardDetails := make(map[string]protocol.ShardDetail)

for _, node := range clusterInfo.Nodes {
shardDetails[node.NodeId] = protocol.ShardDetail{
NodeId: node.NodeId,
HTTPAddr: node.HttpAddr,
}
}

return &protocol.Shards{
NodeId: nodeInfo.NodeId,
LeaderId: clusterInfo.LeaderId,
URIScheme: scheme,
Shards: shardDetails,
}, nil
}

func InfoShardsHandler(api ClientApi) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
InfoShardsRequest.Inc()
Expand All @@ -441,40 525,18 @@ func InfoShardsHandler(api ClientApi) http.HandlerFunc {
return
}

var scheme string
var scheme protocol.Scheme
if r.TLS != nil {
scheme = "https"
scheme = protocol.Https
} else {
scheme = "http"
scheme = protocol.Http
}

clusterInfo := api.ClusterInfo()
if clusterInfo.LeaderId == "" {
http.Error(w, "Leader not found", http.StatusServiceUnavailable)
return
}

if len(clusterInfo.Nodes) == 0 {
http.Error(w, "Nodes not found", http.StatusServiceUnavailable)
shards, err := getShards(api, scheme)
if err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
gdiazlo marked this conversation as resolved.
Show resolved Hide resolved
return
}

nodeInfo := api.Info()
shardDetails := make(map[string]protocol.ShardDetail)

for _, node := range clusterInfo.Nodes {
shardDetails[node.NodeId] = protocol.ShardDetail{
NodeId: node.NodeId,
HTTPAddr: node.HttpAddr,
}
}

shards := &protocol.Shards{
NodeId: nodeInfo.NodeId,
LeaderId: clusterInfo.LeaderId,
URIScheme: protocol.Scheme(scheme),
Shards: shardDetails,
}
out, err := json.Marshal(shards)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
4 changes: 4 additions & 0 deletions api/apihttp/apihttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 158,10 @@ func (b fakeRaftBalloon) Backup() error {
return nil
}

func (b fakeRaftBalloon) IsLeader() bool {
return false
}

func (b fakeRaftBalloon) ListBackups() []*storage.BackupInfo {
return nil
}
Expand Down
34 changes: 34 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 56,33 @@ type HTTPClient struct {
discoveryStopCh chan bool // notify sniffer to stop, and notify back
}

func newCheckRedirect(client *HTTPClient) func(req *http.Request, via []*http.Request) error {
return func(req *http.Request, via []*http.Request) error {
var shards protocol.Shards
body, err := ioutil.ReadAll(req.Response.Body)
if err != nil {
return err
}
err = json.Unmarshal(body, &shards)
if err != nil {
return err
}

var primary string
secondaries := make([]string, 0)
for id, shard := range shards.Shards {
if id == shards.LeaderId {
primary = fmt.Sprintf("%s://%s", shards.URIScheme, shard.HTTPAddr)
} else {
secondaries = append(secondaries, fmt.Sprintf("%s://%s", shards.URIScheme, shard.HTTPAddr))
}
}
client.topology.Update(primary, secondaries...)
req.Host = req.URL.Host
return nil
}
}

// NewSimpleHTTPClient creates a new short-lived client thath can be
// used in use cases where you need one client per request.
//
Expand Down Expand Up @@ -103,6 130,9 @@ func NewSimpleHTTPClient(httpClient *http.Client, urls []string, snapshotStoreUR
log: log.L(),
}

// updates topology with redirect body
client.httpClient.CheckRedirect = newCheckRedirect(client)

client.topology.Update(urls[0], urls[1:]...)

return client, nil
Expand Down Expand Up @@ -149,6 179,10 @@ func NewHTTPClient(options ...HTTPClientOptionF) (*HTTPClient, error) {
discoveryStopCh: make(chan bool),
log: log.L(),
}

// updates topology with redirect body
client.httpClient.CheckRedirect = newCheckRedirect(client)

// Run the options on the client
for _, option := range options {
if err := option(client); err != nil {
Expand Down
104 changes: 104 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 23,8 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"testing"
"time"
Expand All @@ -31,13 33,22 @@ import (

"github.com/bbva/qed/balloon"
"github.com/bbva/qed/crypto/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/testutils/spec"
"github.com/pkg/errors"

"github.com/bbva/qed/protocol"
"github.com/stretchr/testify/assert"
)

func TestMain(m *testing.M) {
log.SetDefault(log.New(&log.LoggerOptions{
IncludeLocation: true,
Level: log.Off,
}))
os.Exit(m.Run())
}

func setupServer(input []byte) (string, func()) {
mux := http.NewServeMux()
server := httptest.NewServer(mux)
Expand Down Expand Up @@ -288,6 299,99 @@ func TestCallPrimaryUnreachableWithDiscovery(t *testing.T) {
require.Equal(t, 3, numRequests, "The number of requests should match")
}

// Having 2 nodes with the primary being a follower, and with
// there should be an http redirect and the client should try
// the leader automatically
func TestCallPrimaryRedirect(t *testing.T) {

var priReqs, secReqs int

var info1, info2 protocol.Shards

info1 = protocol.Shards{
NodeId: "primary1",
LeaderId: "secondary1",
URIScheme: "http",
Shards: map[string]protocol.ShardDetail{
"primary1": protocol.ShardDetail{
NodeId: "primary1",
HTTPAddr: "primary1.foo",
},
"secondary1": protocol.ShardDetail{
NodeId: "secondary1",
HTTPAddr: "secondary1.foo",
},
},
}

info2 = protocol.Shards{
NodeId: "secondary1",
LeaderId: "secondary1",
URIScheme: "http",
Shards: map[string]protocol.ShardDetail{
"secondary1": protocol.ShardDetail{
NodeId: "secondary1",
HTTPAddr: "secondary1.foo",
},
},
}

httpClient := NewTestHttpClient(func(req *http.Request) (*http.Response, error) {
if req.Host == "primary1.foo" {
body, _ := json.Marshal(info1)
if req.URL.Path == "/info/shards" {
priReqs
return buildResponse(http.StatusOK, string(body)), nil
}

h := req.Header
u, _ := url.Parse("http://secondary1.foo")
h.Set("Location", u.String())

return &http.Response{
StatusCode: http.StatusMovedPermanently,
Body: ioutil.NopCloser(bytes.NewBuffer(body)),
Header: h,
}, nil
}
// on a redirect, all headers are empty, somehow, Host is also empty
//...will this work with virtual hosts?
if req.Host == "secondary1.foo" {
secReqs
if req.URL.Path == "/info/shards" {
body, _ := json.Marshal(info2)
return buildResponse(http.StatusOK, string(body)), nil
}
return &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewBufferString("OK")),
Header: make(http.Header),
}, nil
}
return nil, errors.New("Unreachable")
})

client, err := NewHTTPClient(
SetHttpClient(httpClient),
SetAPIKey("my-awesome-api-key"),
SetURLs("http://primary1.foo", "http://secondary1.foo"),
SetReadPreference(PrimaryPreferred),
SetMaxRetries(0),
SetTopologyDiscovery(false),
SetHealthChecks(false),
)
require.NoError(t, err)

// Mark node as dead after NewHTTPClient to simulate a primary failure.
// client.topology.primary.MarkAsDead()

resp, err := client.callPrimary("GET", "/test", nil)
require.NoError(t, err, "The requests should not fail")
require.True(t, len(resp) > 0, "The response should not be empty")
require.Equal(t, 0, priReqs, "The number of requests should match to primary node")
require.Equal(t, 1, secReqs, "The number of requests should match to secondary node")
}

// Having 2 nodes with the primary being unreachable, and with
// "discovery" and "healthcheck" options enabled, there should be a leader election and the
// request should go to the new primary.
Expand Down
9 changes: 8 additions & 1 deletion client/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 29,15 @@ func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {

// NewTestHttpClient returns *http.Client with Transport replaced to avoid making real calls
func NewTestHttpClient(fn RoundTripFunc) *http.Client {

checkRedirect := func(req *http.Request, via []*http.Request) error {
req.Host = req.URL.Host
return nil
}

return &http.Client{
Transport: fn,
Transport: fn,
CheckRedirect: checkRedirect,
}
}

Expand Down