-
Notifications
You must be signed in to change notification settings - Fork 243
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #186 from dipjyotimetia/feature/Influxdb
adding support for exporting metrics to influxdb
- Loading branch information
Showing
7 changed files
with
588 additions
and
42 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 1,44 @@ | ||
version: '3.8' | ||
services: | ||
|
||
locust: | ||
depends_on: | ||
- influxdb | ||
build: | ||
context: . | ||
dockerfile: ./locust/Dockerfile | ||
ports: | ||
- "8089:8089" | ||
- "5557:5557" | ||
command: | ||
[ | ||
"--master","-H","http://0.0.0.0:8089", | ||
"--headless","--expect-workers","1", | ||
"--stop-timeout","30", | ||
"--run-time","60s", | ||
"--spawn-rate","100", | ||
"--users","200" | ||
] | ||
networks: | ||
- locust_net | ||
|
||
influxdb: | ||
image: influxdb:latest | ||
volumes: | ||
- ./influxdb_data:/var/lib/influxdb2 | ||
ports: | ||
- "8086:8086" | ||
expose: | ||
- 8086 | ||
environment: | ||
- DOCKER_INFLUXDB_INIT_MODE=setup | ||
- DOCKER_INFLUXDB_INIT_USERNAME=my-user | ||
- DOCKER_INFLUXDB_INIT_PASSWORD=my-password | ||
- DOCKER_INFLUXDB_INIT_ORG=my-org | ||
- DOCKER_INFLUXDB_INIT_BUCKET=my-bucket | ||
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-auth-token | ||
networks: | ||
- locust_net | ||
|
||
networks: | ||
locust_net: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 1,5 @@ | ||
FROM locustio/locust:2.14.1 | ||
WORKDIR /home/locust | ||
ADD . . | ||
ADD locust/* /home/locust | ||
CMD ["locust"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 1,8 @@ | ||
# coding: utf8 | ||
|
||
from locust import User, task | ||
|
||
class Dummy(User): | ||
@task(20) | ||
def hello(self): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 1,82 @@ | ||
package main | ||
|
||
import ( | ||
"log" | ||
"os" | ||
"os/signal" | ||
"sync" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/myzhan/boomer" | ||
) | ||
|
||
var ( | ||
host = "http://localhost:8086" | ||
token = "my-super-secret-auth-token" | ||
org = "my-org" | ||
bucket = "my-bucket" | ||
) | ||
|
||
func waitForQuit() { | ||
wg := sync.WaitGroup{} | ||
wg.Add(1) | ||
|
||
quitByMe := false | ||
go func() { | ||
c := make(chan os.Signal, 1) | ||
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) | ||
<-c | ||
quitByMe = true | ||
globalBoomer.Quit() | ||
wg.Done() | ||
}() | ||
|
||
boomer.Events.Subscribe(boomer.EVENT_QUIT, func() { | ||
if !quitByMe { | ||
wg.Done() | ||
} | ||
}) | ||
|
||
wg.Wait() | ||
} | ||
|
||
var globalBoomer = boomer.NewBoomer("localhost", 5557) | ||
|
||
func main() { | ||
log.SetFlags(log.LstdFlags | log.Lshortfile) | ||
|
||
ts := boomer.NewWeighingTaskSet() | ||
|
||
taskA := &boomer.Task{ | ||
Name: "TaskA", | ||
Weight: 10, | ||
Fn: func() { | ||
time.Sleep(100 * time.Millisecond) | ||
globalBoomer.RecordSuccess("task", "A", 100, int64(10)) | ||
}, | ||
} | ||
|
||
taskB := &boomer.Task{ | ||
Name: "TaskB", | ||
Weight: 20, | ||
Fn: func() { | ||
time.Sleep(100 * time.Millisecond) | ||
globalBoomer.RecordSuccess("task", "B", 100, int64(20)) | ||
}, | ||
} | ||
|
||
// Expecting RPS(taskA)/RPS(taskB) to be close to 10/20 | ||
ts.AddTask(taskA) | ||
ts.AddTask(taskB) | ||
|
||
task := &boomer.Task{ | ||
Name: "TaskSet", | ||
Fn: ts.Run, | ||
} | ||
globalBoomer.AddOutput(boomer.NewInfluxOutput(host, token, org, bucket)) | ||
globalBoomer.Run(task) | ||
|
||
waitForQuit() | ||
log.Println("shutdown") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 1,73 @@ | ||
# Implementing influxdb export | ||
|
||
install influxdb dependency | ||
|
||
`go get github.com/influxdata/influxdb-client-go/v2` | ||
|
||
```go | ||
|
||
// InfluxOutput pushes boomer stats to InfluxDB. | ||
type InfluxOutput struct { | ||
influx api.WriteAPI | ||
} | ||
|
||
// NewInfluxOutput returns a InfluxOutput. | ||
func NewInfluxOutput(influxHost, token, org, bucket string) *InfluxOutput { | ||
return &InfluxOutput{ | ||
influxdb2.NewClientWithOptions(influxHost, token, | ||
influxdb2.DefaultOptions(). | ||
SetUseGZip(true). | ||
SetTLSConfig(&tls.Config{InsecureSkipVerify: true})).WriteAPI(org, bucket), | ||
} | ||
} | ||
|
||
// OnStart will start influxdb write api | ||
func (o *InfluxOutput) OnStart() { | ||
log.Println("register influx metric collectors") | ||
} | ||
|
||
// OnStop of InfluxOutput force all unwritten data to be sent | ||
func (o *InfluxOutput) OnStop() { | ||
o.influx.Flush() | ||
} | ||
|
||
func (o *InfluxOutput) OnEvent(data map[string]interface{}) { | ||
eventTime := time.Now() | ||
errorsCh := o.influx.Errors() | ||
go func() { | ||
for err := range errorsCh { | ||
log.Println("could not push to influxdb error: %s\n\n", err.Error()) | ||
} | ||
}() | ||
output, err := convertData(data) | ||
if err != nil { | ||
log.Println(fmt.Sprintf("convert data error: %s", err)) | ||
return | ||
} | ||
|
||
for _, stat := range output.Stats { | ||
method := stat.Method | ||
name := stat.Name | ||
point := influxdb2.NewPoint( | ||
method name, | ||
map[string]string{ | ||
"user_count": string(output.UserCount), | ||
"total_rps": strconv.FormatInt(output.TotalRPS, 10), | ||
}, | ||
map[string]interface{}{ | ||
"num_requests": float64(stat.NumRequests), | ||
"num_failures": float64(stat.NumFailures), | ||
"median_response_time": float64(stat.medianResponseTime), | ||
"avg_response_time": stat.avgResponseTime, | ||
"min_response_time": float64(stat.MinResponseTime), | ||
"max_response_time": float64(stat.MaxResponseTime), | ||
"avg_content_length": float64(stat.avgContentLength), | ||
"current_rps": float64(stat.currentRps), | ||
"current_fail_per_sec": float64(stat.currentFailPerSec), | ||
}, | ||
eventTime) | ||
o.influx.WritePoint(point) | ||
} | ||
} | ||
|
||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.