Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for long log lines #265

Merged
merged 5 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 73,9 @@ var (
ErrGRPCBrokerMuxNotSupported = errors.New("client requested gRPC broker multiplexing but plugin does not support the feature")
)

// defaultPluginLogBufferSize is the default size of the buffer used to read from stderr for plugin log lines.
const defaultPluginLogBufferSize = 64 * 1024

// Client handles the lifecycle of a plugin application. It launches
// plugins, connects to them, dispenses interface implementations, and handles
// killing the process.
Expand Down Expand Up @@ -220,6 223,10 @@ type ClientConfig struct {
// it will default to hclog's default logger.
Logger hclog.Logger

// PluginLogBufferSize is the buffer size(bytes) to read from stderr for plugin log lines.
// If this is 0, then the default of 64KB is used.
PluginLogBufferSize int

// AutoMTLS has the client and server automatically negotiate mTLS for
// transport authentication. This ensures that only the original client will
// be allowed to connect to the server, and all other connections will be
Expand Down Expand Up @@ -416,6 423,10 @@ func NewClient(config *ClientConfig) (c *Client) {
})
}

if config.PluginLogBufferSize == 0 {
config.PluginLogBufferSize = defaultPluginLogBufferSize
}

c = &Client{
config: config,
logger: config.Logger,
Expand Down Expand Up @@ -1146,14 1157,12 @@ func (c *Client) getGRPCMuxer(addr net.Addr) (*grpcmux.GRPCClientMuxer, error) {
return c.grpcMuxer, nil
}

var stdErrBufferSize = 64 * 1024

func (c *Client) logStderr(name string, r io.Reader) {
defer c.clientWaitGroup.Done()
defer c.stderrWaitGroup.Done()
l := c.logger.Named(filepath.Base(name))

reader := bufio.NewReaderSize(r, stdErrBufferSize)
reader := bufio.NewReaderSize(r, c.config.PluginLogBufferSize)
// continuation indicates the previous line was a prefix
continuation := false

Expand Down
60 changes: 54 additions & 6 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 7,7 @@ import (
"bytes"
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -1483,18 1484,13 @@ func testClient_logger(t *testing.T, proto string) {

// Test that we continue to consume stderr over long lines.
func TestClient_logStderr(t *testing.T) {
orig := stdErrBufferSize
stdErrBufferSize = 32
defer func() {
stdErrBufferSize = orig
}()

stderr := bytes.Buffer{}
c := NewClient(&ClientConfig{
Stderr: &stderr,
Cmd: &exec.Cmd{
Path: "test",
},
PluginLogBufferSize: 32,
})
c.clientWaitGroup.Add(1)

Expand All @@ -1515,3 1511,55 @@ this line is short
t.Fatalf("\nexpected output: %q\ngot output: %q", msg, read)
}
}

func TestClient_logStderrParseJSON(t *testing.T) {
logBuf := bytes.Buffer{}
c := NewClient(&ClientConfig{
Stderr: bytes.NewBuffer(nil),
Cmd: &exec.Cmd{Path: "test"},
PluginLogBufferSize: 64,
Logger: hclog.New(&hclog.LoggerOptions{
Name: "test-logger",
Level: hclog.Trace,
Output: &logBuf,
JSONFormat: true,
}),
})
c.clientWaitGroup.Add(1)

msg := `{"@message": "this is a message", "@level": "info"}
{"@message": "this is a large message that is more than 64 bytes long", "@level": "info"}`
reader := strings.NewReader(msg)

c.stderrWaitGroup.Add(1)
c.logStderr(c.config.Cmd.Path, reader)
logs := strings.Split(strings.TrimSpace(logBuf.String()), "\n")

wants := []struct {
wantLevel string
wantMessage string
}{
{"info", "this is a message"},
{"debug", `{"@message": "this is a large message that is more than 64 bytes`},
{"debug", ` long", "@level": "info"}`},
}

if len(logs) != len(wants) {
t.Fatalf("expected %d logs, got %d", len(wants), len(logs))
}

for i, tt := range wants {
l := make(map[string]interface{})
if err := json.Unmarshal([]byte(logs[i]), &l); err != nil {
t.Fatal(err)
}

if l["@level"] != tt.wantLevel {
t.Fatalf("expected level %q, got %q", tt.wantLevel, l["@level"])
}

if l["@message"] != tt.wantMessage {
t.Fatalf("expected message %q, got %q", tt.wantMessage, l["@message"])
}
}
}