Skip to content

Commit

Permalink
add async for machinery
Browse files Browse the repository at this point in the history
  • Loading branch information
liangyongxiong committed Oct 17, 2023
1 parent 954703e commit eae8996
Show file tree
Hide file tree
Showing 24 changed files with 1,625 additions and 289 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 71,6 @@ _book
tmp
log
store

### Golang ###
vendor
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 2,17 @@

```sh
# https://go.dev/dl/

# ubuntu
apt install -y golang
apt install -y bison

# centos
yum install -y epel-release
yum install -y golang
yum install -y bison

# https://github.com/moovweb/gvm
wget -c https://raw.githubusercontent.com/moovweb/gvm/master/binscripts/gvm-installer
bash gvm-installer
```
Expand Down Expand Up @@ -58,5 67,15 @@ dlv debug main.go -- --arg1 value1 --arg2 value2
# go project layout

```
https://github.com/golang-standards/project-layout
https://makeoptim.com/golang/standards/project-layout
```
```

# go-swagger

```sh
brew tap go-swagger/go-swagger
brew install go-swagger

swagger serve ./api.json
```
6 changes: 0 additions & 6 deletions TODO.md

This file was deleted.

28 changes: 28 additions & 0 deletions async/call/main.go
Original file line number Diff line number Diff line change
@@ -0,0 1,28 @@
package call

import (
"fmt"
"time"
)

func Add(args ...int64) (int64, error) {
sum := int64(0)
for _, arg := range args {
sum = arg
}
return sum, nil
}

func Multiply(args ...int64) (int64, error) {
sum := int64(1)
for _, arg := range args {
sum *= arg
}
return sum, nil
}

func Cronjob() error {
now := time.Now()
fmt.Println(now.Format("2006=01-02 15:04:05"))
return nil
}
61 changes: 61 additions & 0 deletions async/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 1,61 @@
module async

go 1.20

require github.com/RichardKnop/machinery v1.10.6

require (
cloud.google.com/go v0.110.2 // indirect
cloud.google.com/go/compute v1.20.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.0 // indirect
cloud.google.com/go/pubsub v1.31.0 // indirect
github.com/RichardKnop/logging v0.0.0-20190827224416-1a693bdd4fae // indirect
github.com/aws/aws-sdk-go v1.44.285 // indirect
github.com/bradfitz/gomemcache v0.0.0-20230611145640-acc696258285 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/go-redsync/redsync/v4 v4.8.1 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gomodule/redigo v2.0.0 incompatible // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/klauspost/compress v1.16.6 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/streadway/amqp v1.1.0 // indirect
github.com/stretchr/testify v1.8.1 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
go.mongodb.org/mongo-driver v1.11.7 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/net v0.11.0 // indirect
golang.org/x/oauth2 v0.9.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
google.golang.org/api v0.128.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/grpc v1.56.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
963 changes: 963 additions & 0 deletions async/go.sum

Large diffs are not rendered by default.

90 changes: 90 additions & 0 deletions async/main.go
Original file line number Diff line number Diff line change
@@ -0,0 1,90 @@
package main

import (
"fmt"
machinery "github.com/RichardKnop/machinery/v1"
"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/tasks"
"os"
"os/signal"
"syscall"
"time"

"async/call"
)

var cnf = &config.Config{
Broker: "redis://127.0.0.1:6379/0",
DefaultQueue: "default_queue",
ResultBackend: "eager",
//ResultBackend: "redis://127.0.0.1:6379/0",
}

func InitServer() *machinery.Server {
server, err := machinery.NewServer(cnf)
if err != nil {
return nil
}
return server
}

func LaunchWorker() {
server := InitServer()
server.RegisterTask("add", call.Add)
server.RegisterTask("multiply", call.Multiply)
server.RegisterTask("cronjob", call.Cronjob)

server.RegisterPeriodicTask("* * * * *", "period-task", &tasks.Signature{
Name: "cronjob",
})

fmt.Println("worker initing")
worker := server.NewWorker("worker_name", 10)
fmt.Println("worker inited")
err := worker.Launch()
fmt.Println("worker launched")
if err != nil {
fmt.Println(err)
}
}

func SendTask() {
server := InitServer()
signature := &tasks.Signature{
Name: "add",
Args: []tasks.Arg{
{
Type: "int64",
Value: 1,
},
{
Type: "int64",
Value: 1,
},
},
}

asyncResult, err := server.SendTask(signature)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("id=%s, state=%s\n", asyncResult.GetState().TaskUUID, asyncResult.GetState().State)
results, _ := asyncResult.Get(time.Duration(time.Millisecond * 5))
fmt.Printf("id=%s, state=%s\n", asyncResult.GetState().TaskUUID, asyncResult.GetState().State)
for _, result := range results {
fmt.Printf("value=%v\n", result.Interface())
}

}

func main() {
go LaunchWorker()
SendTask()

ch := make(chan os.Signal, 1)
defer close(ch)
signal.Notify(ch, syscall.SIGINT)
<-ch
fmt.Println("catch interrupt signal")
}
57 changes: 23 additions & 34 deletions basic/go.mod
Original file line number Diff line number Diff line change
@@ -1,69 1,58 @@
module basic

go 1.18
go 1.20

require (
github.com/PuerkitoBio/goquery v1.8.0
github.com/gin-gonic/gin v1.7.7
github.com/PuerkitoBio/goquery v1.8.1
github.com/go-redis/redis v6.15.9 incompatible
github.com/go-resty/resty/v2 v2.7.0
github.com/go-sql-driver/mysql v1.6.0
github.com/go-sql-driver/mysql v1.7.1
github.com/jinzhu/gorm v1.9.16
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f
github.com/pkg/errors v0.9.1
github.com/schollz/progressbar/v3 v3.8.6
github.com/sirupsen/logrus v1.8.1
github.com/tidwall/gjson v1.14.1
github.com/xuri/excelize/v2 v2.6.0
go.mongodb.org/mongo-driver v1.9.0
github.com/schollz/progressbar/v3 v3.13.1
github.com/sirupsen/logrus v1.9.3
github.com/tidwall/gjson v1.14.4
github.com/xuri/excelize/v2 v2.7.1
go.mongodb.org/mongo-driver v1.11.7
)

require (
github.com/andybalholm/cascadia v1.3.1 // indirect
github.com/andybalholm/cascadia v1.3.2 // indirect
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.10.1 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.5 // indirect
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jonboulle/clockwork v0.2.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 // indirect
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/mattn/go-sqlite3 v2.0.3 incompatible // indirect
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/onsi/ginkgo v1.12.0 // indirect
github.com/onsi/gomega v1.7.1 // indirect
github.com/richardlehane/mscfb v1.0.4 // indirect
github.com/richardlehane/msoleps v1.0.1 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/richardlehane/msoleps v1.0.3 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/tebeka/strftime v0.1.5 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.1 // indirect
github.com/xdg-go/stringprep v1.0.3 // indirect
github.com/xuri/efp v0.0.0-20220407160117-ad0f7a785be8 // indirect
github.com/xuri/nfp v0.0.0-20220409054826-5e722a1d9e22 // indirect
github.com/xuri/efp v0.0.0-20230422071738-01f4e37c47e9 // indirect
github.com/xuri/nfp v0.0.0-20230503010013-3f38cdbb0b83 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/net v0.0.0-20220418201149-a630d4f3e7a2 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/protobuf v1.28.0 // indirect
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/net v0.11.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/term v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit eae8996

Please sign in to comment.