art

package module
v0.51.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2024 License: MIT Imports: 24 Imported by: 0

README

art

Features

  • Routes Group
  • Routes Parameter: /users/{user_id}/orders/{order_id}
  • Extendable: middleware support
  • Universal: message-driven architecture, stream processing ...etc, use it for whatever you need.
  • Adapter Lifecycle Management

Installation go package

go get -u github.com/KScaesar/art

Why Create This Package

Simplifying Message Handling in Go.

I believe that most Go developers have used the Gin HTTP package, and my favorite part of it is the way Message HandleFunc are written.

This approach not only satisfies the Single Responsibility Principle (SRP) but also utilizes middleware design to enhance the code's extensibility, fulfilling the requirements of the Open-Closed Principle (OCP).

In everyday work, we not only handle HTTP messages but also utilize other backend common messaging methods such as Redis, RabbitMQ, WebSocket, SSE, and Kafka.

Unfortunately, I often encounter code that is difficult to maintain, written using basic switch-case or if-else statements, in my work.

In Go, these foundational open-source packages typically don't offer a built-in method to achieve HandleFunc design patterns.

Therefore, I create the message mux (multiplexer), aiming to establish a message handling pattern similar to gin's HandleFunc.

Usage example

One example like the following:

Example

Go Playground

package main

func main() {
	art.SetDefaultLogger(art.NewLogger(false, art.LogLevelDebug))

	routeDelimiter := "/"
	mux := art.NewMux(routeDelimiter)

	mux.ErrorHandler(art.UsePrintResult{}.PrintIngress().PostMiddleware())

	// Note:
	// Before registering handler, middleware must be defined;
	// otherwise, the handler won't be able to use middleware.
	mux.Middleware(
		art.UseRecover(),
		art.UsePrintDetail().
			Link(art.UseExclude([]string{"RegisterUser"})).
			PostMiddleware(),
		art.UseLogger(true, art.SafeConcurrency_Skip),
		art.UseHowMuchTime(),
		art.UseAdHocFunc(func(message *art.Message, dep any) error {
			logger := art.CtxGetLogger(message.Ctx, dep)
			logger.Info("    >> recv %q <<", message.Subject)
			return nil
		}).PreMiddleware(),
	)

	// When a subject cannot be found, execute the 'Default'
	mux.DefaultHandler(art.UseSkipMessage())

	v1 := mux.Group("v1/").Middleware(HandleAuth().PreMiddleware())

	v1.Handler("Hello/{user}", Hello)

	db := make(map[string]any)
	v1.Handler("UpdatedProductPrice/{brand}", UpdatedProductPrice(db))

	// Endpoints:
	// [art] subject=".*"                                f="main.main.UseSkipMessage.func11"
	// [art] subject="v1/Hello/{user}"                   f="main.Hello"
	// [art] subject="v1/UpdatedProductPrice/{brand}"    f="main.main.UpdatedProductPrice.func14"
	mux.Endpoints(func(subject, fn string) { fmt.Printf("[art] subject=%-35q f=%q\n", subject, fn) })

	intervalSecond := 2
	Listen(mux, intervalSecond)
}

Advanced usage

Generate code cli is used to generate template code for message.go and adapter.go.

Modify the template content according to the requirements,
select PubSub, Publisher, or Subscriber as needed, and delete unused code.

go install github.com/KScaesar/art/cmd/art@latest
art gen

or

art gen -dir {Path} -pkg {Package} -f {File}
art -h

help: 
    art gen -dir  ./    -pkg  infra    -f  kafka 
    art gen -dir {Path} -pkg {Package} -f {File} 

-dir  Generate code to dir
-f    File prefix name
-pkg  Package name

Class Diagram

classDiagram
direction RL

namespace HandleMessage {
    class Message {
        <<datatype>>
        + Subject: string
    }
    class Ingress
    class Egress

    class Mux {
        + HandleMessage(Message,Dependency)
        + Register(Subject,Handler)
    }

    class Handler {
        <<interface>>
        + HandleMessage(Message,Dependency)
    }
}

namespace AdapterLayer {
    class IAdapter {
        <<interface>>
        + Identifier() string
        + Stop() 
    }

    class Consumer {
        <<interface>>
    }

    class Producer {
        <<interface>>
    }

    class Hub {
        + Join(ID,IAdapter) error
        + RemoveByKey(ID)
        + DoSync(action func(IAdapter)bool)
        + DoAsync(action func(IAdapter))
    }

    class Adapter {
        - IngressMux: Mux
        - EgressMux: Mux
        - hub: Hub

        + Identifier() string
        + Stop() 
        + Send(Message) error
        + RawSend(Message) error
        + Listen() error
    }

    class AdapterOption {
        + Buidl() IAdapter
    }
}

namespace artisan_pubsub {
    class KafkaProducer
    class RabbitMqConsumer
    class Websocket
}

    Message <|-- Ingress
    Message <|-- Egress

    Mux --o Adapter: aggregation
    Handler <|.. Mux: implement
    Handler <.. Mux: register
    Message <.. Mux: handle message
    IAdapter .. Mux: dependency is IAdapter

    IAdapter "n" --* "1" Hub: composition
    Adapter <.. AdapterOption: build

    IAdapter <|.. Adapter: implement
    Consumer <|.. Adapter: implement
    Producer <|.. Adapter: implement
    IAdapter <|.. Consumer
    IAdapter <|.. Producer

    AdapterOption <.. KafkaProducer: use
    AdapterOption <.. RabbitMqConsumer: use
    AdapterOption <.. Websocket: use

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrClosed          = NewCustomError(2001, "service has been closed")
	ErrNotFound        = NewCustomError(2100, "not found")
	ErrNotFoundSubject = NewCustomError(2101, "not found subject mux")
)

Functions

func AnyToString

func AnyToString(v any) string

func CtxWithLogger

func CtxWithLogger(ctx context.Context, v Logger) context.Context

func CtxWithPingPong

func CtxWithPingPong(ctx context.Context, v WaitPingPong) context.Context

func ErrorExtractCode

func ErrorExtractCode(err error) int

func ErrorJoin3rdParty

func ErrorJoin3rdParty(myErr error, Err3rd error) error

func ErrorJoin3rdPartyWithMsg

func ErrorJoin3rdPartyWithMsg(myErr error, Err3rd error, msg string, args ...any) error

func ErrorWrapWithMessage

func ErrorWrapWithMessage(myErr error, msg string, args ...any) error

func GenerateRandomCode

func GenerateRandomCode(length int) string

func GenerateUlid

func GenerateUlid() string

func LookupLogLevel

func LookupLogLevel(level LogLevel) string

func PutMessage

func PutMessage(message *Message)

func ReliableTask

func ReliableTask(task func() error, allowStop func() bool, retryMaxSecond int, fixup func() error) error

func SendPingWaitPong

func SendPingWaitPong(sendPingSeconds int, sendPing func() error, waitPong WaitPingPong, isStopped func() bool) error

func SetDefaultLogger

func SetDefaultLogger(l Logger)

func UseSkipMessage

func UseSkipMessage() func(message *Message, dep any) error

func WaitPingSendPong

func WaitPingSendPong(waitPingSeconds int, waitPing WaitPingPong, sendPong func() error, isStop func() bool) error

Types

type Adapter

type Adapter struct {
	// contains filtered or unexported fields
}

func (*Adapter) Identifier

func (adp *Adapter) Identifier() string

func (*Adapter) IsStopped

func (adp *Adapter) IsStopped() bool

func (*Adapter) Listen

func (adp *Adapter) Listen() (err error)

func (*Adapter) Log

func (adp *Adapter) Log() Logger

func (*Adapter) OnDisconnect

func (adp *Adapter) OnDisconnect(terminates ...func(adp IAdapter))

func (*Adapter) RawInfra

func (adp *Adapter) RawInfra() any

func (*Adapter) RawSend

func (adp *Adapter) RawSend(messages ...*Message) error

func (*Adapter) Send

func (adp *Adapter) Send(messages ...*Message) error

func (*Adapter) SetLog

func (adp *Adapter) SetLog(logger Logger)

func (*Adapter) Stop

func (adp *Adapter) Stop() error

func (*Adapter) WaitStop

func (adp *Adapter) WaitStop() chan struct{}

type AdapterHub

type AdapterHub interface {
	Join(adapterId string, adp IAdapter) error
	RemoveOne(filter func(IAdapter) bool)
}

type AdapterOption

type AdapterOption struct {
	// contains filtered or unexported fields
}

func NewAdapterOption

func NewAdapterOption() (opt *AdapterOption)

func (*AdapterOption) AdapterHub

func (opt *AdapterOption) AdapterHub(hub AdapterHub) *AdapterOption

func (*AdapterOption) Build

func (opt *AdapterOption) Build() (adp IAdapter, err error)

func (*AdapterOption) DecorateAdapter

func (opt *AdapterOption) DecorateAdapter(wrap func(adapter IAdapter) (application IAdapter)) *AdapterOption

func (*AdapterOption) EgressMux

func (opt *AdapterOption) EgressMux(mux *Mux) *AdapterOption

func (*AdapterOption) Identifier

func (opt *AdapterOption) Identifier(identifier string) *AdapterOption

func (*AdapterOption) IngressMux

func (opt *AdapterOption) IngressMux(mux *Mux) *AdapterOption

func (*AdapterOption) Lifecycle

func (opt *AdapterOption) Lifecycle(setup func(life *Lifecycle)) *AdapterOption

func (*AdapterOption) Logger

func (opt *AdapterOption) Logger(logger Logger) *AdapterOption

func (*AdapterOption) RawFixup added in v0.49.2

func (opt *AdapterOption) RawFixup(maxRetrySecond int, fixup func(IAdapter) error) *AdapterOption

func (*AdapterOption) RawInfra

func (opt *AdapterOption) RawInfra(infra any) *AdapterOption

func (*AdapterOption) RawRecv added in v0.49.2

func (opt *AdapterOption) RawRecv(recv func(logger Logger) (message *Message, err error)) *AdapterOption

func (*AdapterOption) RawSend added in v0.49.2

func (opt *AdapterOption) RawSend(send func(logger Logger, message *Message) error) *AdapterOption

func (*AdapterOption) RawStop added in v0.49.2

func (opt *AdapterOption) RawStop(stop func(logger Logger) error) *AdapterOption

func (*AdapterOption) SendPing

func (opt *AdapterOption) SendPing(sendPingSeconds int, waitPong WaitPingPong, sendPing func(IAdapter) error) *AdapterOption

SendPing

When SendPingWaitPong sends a ping message and waits for a corresponding pong message. SendPeriod = WaitSecond / 2

func (*AdapterOption) WaitPing

func (opt *AdapterOption) WaitPing(waitPingSeconds int, waitPing WaitPingPong, sendPong func(IAdapter) error) *AdapterOption

WaitPing

When WaitPingSendPong waits for a ping message and response a corresponding pong message. SendPeriod = WaitSecond

type Consumer

type Consumer interface {
	IAdapter
	Listen() (err error)
}

type CustomError

type CustomError struct {
	// contains filtered or unexported fields
}

func NewCustomError

func NewCustomError(myCode int, title string) *CustomError

func (*CustomError) CustomError

func (c *CustomError) CustomError()

func (*CustomError) Error

func (c *CustomError) Error() string

func (*CustomError) MyCode

func (c *CustomError) MyCode() int

type HandleFunc

type HandleFunc func(message *Message, dep any) error
func Link(handler HandleFunc, middlewares ...Middleware) HandleFunc

func UseAdHocFunc

func UseAdHocFunc(AdHoc HandleFunc) HandleFunc

func UsePrintDetail

func UsePrintDetail() HandleFunc
func (h HandleFunc) Link(middlewares ...Middleware) HandleFunc

func (HandleFunc) PostMiddleware

func (h HandleFunc) PostMiddleware() Middleware

func (HandleFunc) PreMiddleware

func (h HandleFunc) PreMiddleware() Middleware

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

func NewHub

func NewHub() *Hub

func (*Hub) Count

func (hub *Hub) Count(filter func(IAdapter) bool) int

Count If filter returns true, increase count

func (*Hub) DoAsync

func (hub *Hub) DoAsync(action func(IAdapter))

func (*Hub) DoSync

func (hub *Hub) DoSync(action func(IAdapter) (stop bool))

DoSync If action returns stop=true, stops the iteration.

func (*Hub) FindByKey

func (hub *Hub) FindByKey(key string) (adp IAdapter, found bool)

func (*Hub) FindMulti

func (hub *Hub) FindMulti(filter func(IAdapter) bool) (all []IAdapter, found bool)

If filter returns true, find target

func (*Hub) FindMultiByKey

func (hub *Hub) FindMultiByKey(keys []string) (all []IAdapter, found bool)

func (*Hub) FindOne

func (hub *Hub) FindOne(filter func(IAdapter) bool) (adp IAdapter, found bool)

If filter returns true, find target

func (*Hub) IsShutdown

func (hub *Hub) IsShutdown() bool

func (*Hub) Join

func (hub *Hub) Join(key string, adp IAdapter) error

func (*Hub) RemoveByKey

func (hub *Hub) RemoveByKey(key string)

func (*Hub) RemoveMulti

func (hub *Hub) RemoveMulti(filter func(IAdapter) bool)

If filter returns true, remove target

func (*Hub) RemoveMultiByKey

func (hub *Hub) RemoveMultiByKey(keys []string)

func (*Hub) RemoveOne

func (hub *Hub) RemoveOne(filter func(IAdapter) bool)

If filter returns true, remove target

func (*Hub) SetConcurrencyQty

func (hub *Hub) SetConcurrencyQty(concurrencyQty int)

SetConcurrencyQty concurrencyQty controls how many tasks can run simultaneously, preventing resource usage or avoid frequent context switches.

func (*Hub) Shutdown

func (hub *Hub) Shutdown()

func (*Hub) Total

func (hub *Hub) Total() int

func (*Hub) UpdateByOldKey

func (hub *Hub) UpdateByOldKey(oldKey string, update func(IAdapter) (freshKey string)) error

func (*Hub) WaitShutdown

func (hub *Hub) WaitShutdown() chan struct{}

type IAdapter

type IAdapter interface {
	Identifier() string
	Log() Logger
	SetLog(Logger)
	OnDisconnect(terminates ...func(adp IAdapter))
	Stop() error
	IsStopped() bool         // IsStopped is used for polling
	WaitStop() chan struct{} // WaitStop is used for event push
	RawInfra() any
}

type Lifecycle

type Lifecycle struct {
	// contains filtered or unexported fields
}

Lifecycle define a management mechanism when init obj and terminate obj.

func (*Lifecycle) OnConnect

func (life *Lifecycle) OnConnect(inits ...func(adp IAdapter) error) *Lifecycle

func (*Lifecycle) OnDisconnect

func (life *Lifecycle) OnDisconnect(terminates ...func(adp IAdapter)) *Lifecycle

type LogLevel

type LogLevel uint8
const (
	LogLevelDebug LogLevel = iota + 1
	LogLevelInfo
	LogLevelWarn
	LogLevelError
	LogLevelFatal
)

type Logger

type Logger interface {
	Debug(format string, a ...any)
	Info(format string, a ...any)
	Warn(format string, a ...any)
	Error(format string, a ...any)
	Fatal(format string, a ...any)

	SetLogLevel(level LogLevel)
	LogLevel() LogLevel
	WithCallDepth(externalDepth uint) Logger
	WithKeyValue(key string, v any) Logger
}

func CtxGetLogger

func CtxGetLogger(ctx context.Context) Logger

func DefaultLogger

func DefaultLogger() Logger

func NewLogger

func NewLogger(printPath bool, level LogLevel) Logger

func NewWriterLogger

func NewWriterLogger(w io.Writer, printPath bool, level LogLevel) Logger

func SilentLogger

func SilentLogger() Logger

type Message

type Message struct {
	Subject string

	Bytes []byte
	Body  any

	Mutex sync.Mutex

	// RouteParam are used to capture values from subject.
	// These parameters represent resources or identifiers.
	//
	// Example:
	//
	//	define mux subject = "/users/{id}"
	//	send or recv subject = "/users/1017"
	//
	//	get route param:
	//		key : value => id : 1017
	RouteParam maputil.Data

	Metadata maputil.Data

	RawInfra any

	Ctx context.Context
	// contains filtered or unexported fields
}

func GetMessage

func GetMessage() *Message

func (*Message) Copy

func (msg *Message) Copy() *Message

func (*Message) MsgId

func (msg *Message) MsgId() string

func (*Message) SetMsgId

func (msg *Message) SetMsgId(msgId string)

func (*Message) UpdateContext

func (msg *Message) UpdateContext(updates ...func(ctx context.Context) context.Context) context.Context

type Middleware

type Middleware func(next HandleFunc) HandleFunc

func UseAsync

func UseAsync() Middleware

func UseCopyMessage added in v0.50.0

func UseCopyMessage() Middleware

func UseExclude

func UseExclude(subjects []string) Middleware

func UseHowMuchTime

func UseHowMuchTime() Middleware

func UseInclude

func UseInclude(subjects []string) Middleware

func UseLogger

func UseLogger(withMsgId bool, safeConcurrency SafeConcurrencyKind) Middleware

func UseRecover

func UseRecover() Middleware

func UseRetry

func UseRetry(retryMaxSecond int) Middleware
func (mw Middleware) Link(handler HandleFunc) HandleFunc

type Mux

type Mux struct {
	// contains filtered or unexported fields
}

Mux refers to a router or multiplexer, which can be used to handle different message. Itself is also a HandleFunc, but with added routing capabilities.

Message represents a high-level abstraction data structure containing metadata (e.g. header) + body

func NewMux

func NewMux(routeDelimiter string) *Mux

NewMux If routeDelimiter is an empty string, Message.RouteParam cannot be used. RouteDelimiter can only be set to a string of length 1. This parameter determines different parts of the Message.Subject.

func (*Mux) DefaultHandler

func (mux *Mux) DefaultHandler(h HandleFunc, mw ...Middleware) *Mux

DefaultHandler When a subject cannot be found, execute the 'Default'.

"The difference between 'Default' and 'NotFound' is that the 'Default' handler will utilize middleware, whereas 'NotFound' won't use middleware."

func (*Mux) EnableMessagePool

func (mux *Mux) EnableMessagePool() *Mux

func (*Mux) Endpoints

func (mux *Mux) Endpoints(action func(subject, handler string))

Endpoints get register handler function information

func (*Mux) ErrorHandler

func (mux *Mux) ErrorHandler(errHandlers ...Middleware) *Mux

func (*Mux) Group

func (mux *Mux) Group(groupName string) *Mux

func (*Mux) GroupByNumber

func (mux *Mux) GroupByNumber(groupName int) *Mux

func (*Mux) HandleMessage

func (mux *Mux) HandleMessage(message *Message, dependency any) (err error)

HandleMessage to handle various messages

- route parameter can nil

func (*Mux) Handler

func (mux *Mux) Handler(subject string, h HandleFunc, mw ...Middleware) *Mux

func (*Mux) HandlerByNumber

func (mux *Mux) HandlerByNumber(subject int, h HandleFunc, mw ...Middleware) *Mux

func (*Mux) Middleware

func (mux *Mux) Middleware(middlewares ...Middleware) *Mux

Middleware Before registering handler, middleware must be defined; otherwise, the handler won't be able to use middleware.

func (*Mux) NotFoundHandler

func (mux *Mux) NotFoundHandler(h HandleFunc) *Mux

NotFoundHandler When a subject cannot be found, execute the 'NotFound'.

"The difference between 'Default' and 'NotFound' is that the 'Default' handler will utilize middleware, whereas 'NotFound' won't use middleware."

func (*Mux) PostMiddleware

func (mux *Mux) PostMiddleware(handleFuncs ...HandleFunc) *Mux

func (*Mux) PreMiddleware

func (mux *Mux) PreMiddleware(handleFuncs ...HandleFunc) *Mux

func (*Mux) Transform

func (mux *Mux) Transform(transform HandleFunc) *Mux

Transform Originally, the message passed through the mux would only call 'getSubject' once. However, if there is a definition of Transform, when the message passes through the Transform function, 'getSubject' will be called again.

type Producer

type Producer interface {
	IAdapter
	Send(messages ...*Message) error
	RawSend(messages ...*Message) error
}

type Prosumer

type Prosumer interface {
	Producer
	Consumer
}

type SafeConcurrencyKind

type SafeConcurrencyKind int
const (
	SafeConcurrency_Skip SafeConcurrencyKind = iota
	SafeConcurrency_Mutex
	SafeConcurrency_Copy
)

type Shutdown

type Shutdown struct {
	Logger Logger
	// contains filtered or unexported fields
}

func NewShutdown

func NewShutdown() *Shutdown

func (*Shutdown) Notify

func (s *Shutdown) Notify(cause error)

func (*Shutdown) Serve

func (s *Shutdown) Serve(ctx context.Context)

func (*Shutdown) StopService

func (s *Shutdown) StopService(name string, action func() error) *Shutdown

func (*Shutdown) WaitFinish

func (s *Shutdown) WaitFinish() chan struct{}

type UsePrintResult

type UsePrintResult struct {
	// contains filtered or unexported fields
}

func (UsePrintResult) IgnoreErrSubjects added in v0.49.0

func (use UsePrintResult) IgnoreErrSubjects(subjects ...string) UsePrintResult

func (UsePrintResult) IgnoreErrors added in v0.49.0

func (use UsePrintResult) IgnoreErrors(errs ...error) UsePrintResult

func (UsePrintResult) IgnoreOkSubjects added in v0.49.0

func (use UsePrintResult) IgnoreOkSubjects(subjects ...string) UsePrintResult

func (UsePrintResult) PostMiddleware added in v0.49.0

func (use UsePrintResult) PostMiddleware() Middleware

func (UsePrintResult) PrintEgress added in v0.49.0

func (use UsePrintResult) PrintEgress() UsePrintResult

func (UsePrintResult) PrintIngress added in v0.49.0

func (use UsePrintResult) PrintIngress() UsePrintResult

type WaitPingPong

type WaitPingPong chan struct{}

func CtxGetPingPong

func CtxGetPingPong(ctx context.Context) WaitPingPong

func NewWaitPingPong

func NewWaitPingPong() WaitPingPong

func (WaitPingPong) Ack

func (wait WaitPingPong) Ack()

Directories

Path Synopsis
cmd
art

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL