package pipeline import ( "git.noahlan.cn/noahlan/nnet/entity" "sync" ) type ( Func func(entity entity.NetworkEntity, v interface{}) error // Pipeline 消息管道 Pipeline interface { Outbound() Channel Inbound() Channel } pipeline struct { outbound, inbound *pipelineChannel } Channel interface { PushFront(h Func) PushBack(h Func) Process(entity entity.NetworkEntity, v interface{}) error } pipelineChannel struct { mu sync.RWMutex handlers []Func } ) func New() Pipeline { return &pipeline{ outbound: &pipelineChannel{}, inbound: &pipelineChannel{}, } } func (p *pipeline) Outbound() Channel { return p.outbound } func (p *pipeline) Inbound() Channel { return p.inbound } // PushFront 将func压入slice首位 func (p *pipelineChannel) PushFront(h Func) { p.mu.Lock() defer p.mu.Unlock() handlers := make([]Func, len(p.handlers)+1) handlers[0] = h copy(handlers[1:], p.handlers) p.handlers = handlers } // PushBack 将func压入slice末位 func (p *pipelineChannel) PushBack(h Func) { p.mu.Lock() defer p.mu.Unlock() p.handlers = append(p.handlers, h) } // Process 处理所有的pipeline方法 func (p *pipelineChannel) Process(entity entity.NetworkEntity, v interface{}) error { if len(p.handlers) < 1 { return nil } p.mu.RLock() defer p.mu.RUnlock() for _, handler := range p.handlers { err := handler(entity, v) if err != nil { return err } } return nil }