You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
78 lines
1.5 KiB
Go
78 lines
1.5 KiB
Go
package pipeline
|
|
|
|
import (
|
|
"git.noahlan.cn/northlan/ngs/internal/message"
|
|
"git.noahlan.cn/northlan/ngs/session"
|
|
"sync"
|
|
)
|
|
|
|
type (
|
|
// Message is the alias of `message.Message`
|
|
Message = message.Message
|
|
|
|
Func func(s *session.Session, msg *message.Message) error
|
|
|
|
Pipeline interface {
|
|
Outbound() Channel
|
|
Inbound() Channel
|
|
}
|
|
|
|
pipeline struct {
|
|
outbound, inbound *pipelineChannel
|
|
}
|
|
|
|
Channel interface {
|
|
PushFront(h Func)
|
|
PushBack(h Func)
|
|
Process(s *session.Session, msg *message.Message) error
|
|
}
|
|
|
|
pipelineChannel struct {
|
|
mu sync.RWMutex
|
|
handlers []Func
|
|
}
|
|
)
|
|
|
|
func New() Pipeline {
|
|
return &pipeline{
|
|
outbound: &pipelineChannel{},
|
|
inbound: &pipelineChannel{},
|
|
}
|
|
}
|
|
|
|
func (p *pipeline) Outbound() Channel { return p.outbound }
|
|
func (p *pipeline) Inbound() Channel { return p.inbound }
|
|
|
|
// PushFront push a function to the front of the pipeline
|
|
func (p *pipelineChannel) PushFront(h Func) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
handlers := make([]Func, len(p.handlers)+1)
|
|
handlers[0] = h
|
|
copy(handlers[1:], p.handlers)
|
|
p.handlers = handlers
|
|
}
|
|
|
|
// PushBack push a function to the end of the pipeline
|
|
func (p *pipelineChannel) PushBack(h Func) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
p.handlers = append(p.handlers, h)
|
|
}
|
|
|
|
// Process message with all pipeline functions
|
|
func (p *pipelineChannel) Process(s *session.Session, msg *message.Message) error {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
if len(p.handlers) < 1 {
|
|
return nil
|
|
}
|
|
for _, h := range p.handlers {
|
|
err := h(s, msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|