Skip to content

Commit

Permalink
Merge pull request #47 from ihippik/refactoring_arch
Browse files Browse the repository at this point in the history
Refactoring arch
  • Loading branch information
ihippik authored Sep 20, 2024
2 parents 0dfc288 f00d5a6 commit 997f32e
Show file tree
Hide file tree
Showing 30 changed files with 396 additions and 372 deletions.
4 changes: 2 additions & 2 deletions cmd/wal-listener/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 8,8 @@ import (
"github.com/jackc/pgx"
"github.com/nats-io/nats.go"

"github.com/ihippik/wal-listener/v2/config"
"github.com/ihippik/wal-listener/v2/publisher"
"github.com/ihippik/wal-listener/v2/internal/config"
"github.com/ihippik/wal-listener/v2/internal/publisher"
)

// initPgxConnections initialise db and replication connections.
Expand Down
13 changes: 7 additions & 6 deletions cmd/wal-listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 11,9 @@ import (
scfg "github.com/ihippik/config"
"github.com/urfave/cli/v2"

"github.com/ihippik/wal-listener/v2/config"
"github.com/ihippik/wal-listener/v2/listener"
"github.com/ihippik/wal-listener/v2/internal/config"
"github.com/ihippik/wal-listener/v2/internal/listener"
"github.com/ihippik/wal-listener/v2/internal/listener/transaction"
)

func main() {
Expand Down Expand Up @@ -73,19 74,19 @@ func main() {
}
}()

service := listener.NewWalListener(
svc := listener.NewWalListener(
cfg,
logger,
listener.NewRepository(conn),
rConn,
pub,
listener.NewBinaryParser(logger, binary.BigEndian),
transaction.NewBinaryParser(logger, binary.BigEndian),
config.NewMetrics(),
)

go service.InitHandlers(ctx)
go svc.InitHandlers(ctx)

if err := service.Process(ctx); err != nil {
if err = svc.Process(ctx); err != nil {
slog.Error("service process failed", "err", err.Error())
}

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
30 changes: 19 additions & 11 deletions listener/listener.go → internal/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 2,7 @@ package listener

import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
Expand All @@ -15,8 16,9 @@ import (
"github.com/jackc/pgx"
"golang.org/x/sync/errgroup"

"github.com/ihippik/wal-listener/v2/config"
"github.com/ihippik/wal-listener/v2/publisher"
"github.com/ihippik/wal-listener/v2/internal/config"
tx "github.com/ihippik/wal-listener/v2/internal/listener/transaction"
"github.com/ihippik/wal-listener/v2/internal/publisher"
)

// Logical decoding plugin.
Expand All @@ -27,7 29,7 @@ type eventPublisher interface {
}

type parser interface {
ParseWalMessage([]byte, *WalTransaction) error
ParseWalMessage([]byte, *tx.WAL) error
}

type replication interface {
Expand Down Expand Up @@ -69,6 71,12 @@ type Listener struct {
isAlive atomic.Bool
}

var (
errReplConnectionIsLost = errors.New("replication connection to postgres is lost")
errConnectionIsLost = errors.New("db connection to postgres is lost")
errReplDidNotStart = errors.New("replication did not start")
)

// NewWalListener create and initialize new service instance.
func NewWalListener(
cfg *config.Config,
Expand Down Expand Up @@ -310,7 318,7 @@ func (l *Listener) Stream(ctx context.Context) error {
},
}

tx := NewWalTransaction(l.log, pool, l.monitor)
txWAL := tx.NewWAL(l.log, pool, l.monitor)

for {
if err := ctx.Err(); err != nil {
Expand All @@ -328,29 336,29 @@ func (l *Listener) Stream(ctx context.Context) error {
continue
}

if err = l.processMessage(ctx, msg, tx); err != nil {
if err = l.processMessage(ctx, msg, txWAL); err != nil {
return fmt.Errorf("process message: %w", err)
}

l.processHeartBeat(msg)
}
}

func (l *Listener) processMessage(ctx context.Context, msg *pgx.ReplicationMessage, tx *WalTransaction) error {
func (l *Listener) processMessage(ctx context.Context, msg *pgx.ReplicationMessage, txWAL *tx.WAL) error {
if msg.WalMessage == nil {
l.log.Debug("empty wal-message")
return nil
}

l.log.Debug("WAL message has been received", slog.Uint64("wal", msg.WalMessage.WalStart))

if err := l.parser.ParseWalMessage(msg.WalMessage.WalData, tx); err != nil {
if err := l.parser.ParseWalMessage(msg.WalMessage.WalData, txWAL); err != nil {
l.monitor.IncProblematicEvents(problemKindParse)
return fmt.Errorf("parse: %w", err)
}

if tx.CommitTime != nil {
for event := range tx.CreateEventsWithFilter(ctx, l.cfg.Listener.Filter.Tables) {
if txWAL.CommitTime != nil {
for event := range txWAL.CreateEventsWithFilter(ctx, l.cfg.Listener.Filter.Tables) {
subjectName := event.SubjectName(l.cfg)

if err := l.publisher.Publish(ctx, subjectName, event); err != nil {
Expand All @@ -368,10 376,10 @@ func (l *Listener) processMessage(ctx context.Context, msg *pgx.ReplicationMessa
slog.Uint64("lsn", l.readLSN()),
)

tx.pool.Put(event)
txWAL.RetrieveEvent(event)
}

tx.Clear()
txWAL.Clear()
}

if msg.WalMessage.WalStart > l.readLSN() {
Expand Down
17 changes: 5 additions & 12 deletions listener/listener_test.go → internal/listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 15,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/ihippik/wal-listener/v2/config"
"github.com/ihippik/wal-listener/v2/publisher"
"github.com/ihippik/wal-listener/v2/internal/config"
tx "github.com/ihippik/wal-listener/v2/internal/listener/transaction"
"github.com/ihippik/wal-listener/v2/internal/publisher"
)

var (
Expand Down Expand Up @@ -467,7 468,7 @@ func TestListener_Stream(t *testing.T) {
repo.On("NewStandbyStatus", walPositions).Return(status, err).After(10 * time.Millisecond)
}

setParseWalMessageOnce := func(msg []byte, tx *WalTransaction, err error) {
setParseWalMessageOnce := func(msg []byte, tx *tx.WAL, err error) {
prs.On("ParseWalMessage", msg, tx).Return(err)
}

Expand Down Expand Up @@ -560,15 561,7 @@ func TestListener_Stream(t *testing.T) {

setParseWalMessageOnce(
[]byte(`some bytes`),
&WalTransaction{
monitor: metrics,
log: logger,
LSN: 0,
BeginTime: nil,
CommitTime: nil,
RelationStore: make(map[int32]RelationData),
Actions: nil,
},
tx.NewWAL(logger, nil, metrics),
nil,
)

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 4,27 @@ import (
"time"

"github.com/stretchr/testify/mock"

trx "github.com/ihippik/wal-listener/v2/internal/listener/transaction"
)

type parserMock struct {
mock.Mock
}

func (p *parserMock) ParseWalMessage(msg []byte, tx *WalTransaction) error {
func (p *parserMock) ParseWalMessage(msg []byte, tx *trx.WAL) error {
args := p.Called(msg, tx)
now := time.Now()

tx.BeginTime = &now
tx.CommitTime = &now
tx.Actions = []ActionData{
tx.Actions = []trx.ActionData{
{
Schema: "public",
Table: "users",
Kind: "INSERT",
NewColumns: []Column{
{
name: "id",
value: 1,
valueType: 23,
isKey: true,
},
NewColumns: []trx.Column{
trx.InitColumn(nil, "id", 1, 23, true),
},
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 5,7 @@ import (

"github.com/stretchr/testify/mock"

"github.com/ihippik/wal-listener/v2/publisher"
"github.com/ihippik/wal-listener/v2/internal/publisher"
)

type publisherMock struct {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
124 changes: 124 additions & 0 deletions internal/listener/transaction/data.go
Original file line number Diff line number Diff line change
@@ -0,0 1,124 @@
package transaction

import (
"log/slog"
"strconv"
"time"

"github.com/goccy/go-json"
"github.com/google/uuid"
)

// ActionKind kind of action on WAL message.
type ActionKind string

// kind of WAL message.
const (
ActionKindInsert ActionKind = "INSERT"
ActionKindUpdate ActionKind = "UPDATE"
ActionKindDelete ActionKind = "DELETE"
)

func (k ActionKind) string() string {
return string(k)
}

// RelationData kind of WAL message data.
type RelationData struct {
Schema string
Table string
Columns []Column
}

// ActionData kind of WAL message data.
type ActionData struct {
Schema string
Table string
Kind ActionKind
OldColumns []Column
NewColumns []Column
}

// Column of the table with which changes occur.
type Column struct {
log *slog.Logger
name string
value any
valueType int
isKey bool
}

// InitColumn create new Column instance with data.s
func InitColumn(log *slog.Logger, name string, value any, valueType int, isKey bool) Column {
return Column{log: log, name: name, value: value, valueType: valueType, isKey: isKey}
}

// AssertValue converts bytes to a specific type depending
// on the type of this data in the database table.
func (c *Column) AssertValue(src []byte) {
var (
val any
err error
)

if src == nil {
c.value = nil
return
}

strSrc := string(src)

const (
timestampLayout = "2006-01-02 15:04:05"
timestampWithTZLayout = "2006-01-02 15:04:05.999999999-07"
)

switch c.valueType {
case BoolOID:
val, err = strconv.ParseBool(strSrc)
case Int2OID, Int4OID:
val, err = strconv.Atoi(strSrc)
case Int8OID:
val, err = strconv.ParseInt(strSrc, 10, 64)
case TextOID, VarcharOID:
val = strSrc
case TimestampOID:
val, err = time.Parse(timestampLayout, strSrc)
case TimestamptzOID:
val, err = time.ParseInLocation(timestampWithTZLayout, strSrc, time.UTC)
case DateOID, TimeOID:
val = strSrc
case UUIDOID:
val, err = uuid.Parse(strSrc)
case JSONBOID:
var m any

if src[0] == '[' {
m = make([]any, 0)
} else {
m = make(map[string]any)
}

err = json.Unmarshal(src, &m)
val = m
default:
c.log.Debug(
"unknown oid type",
slog.Int("pg_type", c.valueType),
slog.String("column_name", c.name),
)

val = strSrc
}

if err != nil {
c.log.Error(
"column data parse error",
slog.String("err", err.Error()),
slog.Int("pg_type", c.valueType),
slog.String("column_name", c.name),
)
}

c.value = val
}
9 changes: 9 additions & 0 deletions internal/listener/transaction/monitor_mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 1,9 @@
package transaction

type monitorMock struct{}

func (m *monitorMock) IncPublishedEvents(subject, table string) {}

func (m *monitorMock) IncFilterSkippedEvents(table string) {}

func (m *monitorMock) IncProblematicEvents(kind string) {}
Loading

0 comments on commit 997f32e

Please sign in to comment.