-
Notifications
You must be signed in to change notification settings - Fork 18
/
main.go
99 lines (82 loc) · 1.96 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package main
import (
"io/ioutil"
"log"
"os"
"plugin"
"strings"
"github.com/cswank/kcli/internal/kafka"
"github.com/cswank/kcli/internal/views"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
var (
addrs = kingpin.Flag("addresses", "comma separated list of kafka addresses").Default("localhost:9092").Short('a').Strings()
logout = kingpin.Flag("log", "for debugging, set the log output to a file").Short('l').String()
topic = kingpin.Flag("topic", "go directly to a topic").Short('t').String()
partition = kingpin.Flag("partition", "go directly to a partition of a topic").Short('p').Default("-1").Int()
offset = kingpin.Flag("offset", "go directly to a message").Short('o').Default("-1").Int()
decoder = kingpin.Flag("decoder", "path to a plugin to decode kafka messages").Short('d').String()
f *os.File
)
func init() {
kingpin.Parse()
}
func main() {
cli := connect()
setLogout()
err := views.NewGui(cli, *topic, *partition, *offset)
if f != nil {
f.Close()
log.SetOutput(os.Stderr)
}
if err != nil {
log.Fatal(err)
}
}
func connect() *kafka.Client {
var opts []kafka.Opt
if *decoder != "" {
dec := getDecoder(*decoder)
opts = []kafka.Opt{kafka.WithDecoder(dec)}
}
cli, err := kafka.New(getAddresses(*addrs), opts...)
if err != nil {
log.Fatal(err)
}
return cli
}
func setLogout() {
if *logout != "" {
var err error
f, err = os.Create(*logout)
if err != nil {
log.Fatal(err)
}
log.SetOutput(f)
} else {
log.SetOutput(ioutil.Discard)
}
}
func getAddresses(addrs []string) []string {
var out []string
for _, addr := range addrs {
out = append(out, strings.Split(addr, ",")...)
}
return out
}
func getDecoder(pth string) kafka.Decoder {
plug, err := plugin.Open(pth)
if err != nil {
log.Fatal(err)
}
s, err := plug.Lookup("Decoder")
if err != nil {
log.Fatal(err)
}
var dec kafka.Decoder
dec, ok := s.(kafka.Decoder)
if !ok {
log.Fatalf("unexpected type from module symbol")
}
return dec
}