406 lines
10 KiB
Go
406 lines
10 KiB
Go
// This source code is part of the near-RT RIC (RAN Intelligent Controller)
|
|
// platform project (RICP).
|
|
|
|
// Copyright 2019 Nokia
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
// This source code is part of the near-RT RIC (RAN Intelligent Controller)
|
|
// platform project (RICP).
|
|
|
|
package dispatcher
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/pkg/errors"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
"xappmock/enums"
|
|
"xappmock/logger"
|
|
"xappmock/models"
|
|
"xappmock/rmr"
|
|
"xappmock/sender"
|
|
)
|
|
|
|
// Id -> Command
|
|
var configuration = make(map[string]*models.JsonCommand)
|
|
|
|
// Rmr Message Id -> Command
|
|
var waitForRmrMessageType = make(map[int]*models.JsonCommand)
|
|
|
|
func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonCommand) error {
|
|
rmrMsgId, err := rmr.MessageIdToUint(rmrMessageToWaitFor)
|
|
|
|
if err != nil {
|
|
return errors.New(fmt.Sprintf("invalid rmr message id: %s", rmrMessageToWaitFor))
|
|
}
|
|
|
|
waitForRmrMessageType[int(rmrMsgId)] = &command
|
|
return nil
|
|
}
|
|
|
|
type Dispatcher struct {
|
|
rmrService *rmr.Service
|
|
processResult models.ProcessResult
|
|
logger *logger.Logger
|
|
jsonSender *sender.JsonSender
|
|
}
|
|
|
|
func (d *Dispatcher) GetProcessResult() models.ProcessResult {
|
|
return d.processResult
|
|
}
|
|
|
|
func New(logger *logger.Logger, rmrService *rmr.Service, jsonSender *sender.JsonSender) *Dispatcher {
|
|
return &Dispatcher{
|
|
rmrService: rmrService,
|
|
logger: logger,
|
|
jsonSender: jsonSender,
|
|
}
|
|
}
|
|
|
|
func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
|
|
if len(cmd.Id) == 0 {
|
|
return errors.New(fmt.Sprintf("invalid cmd, no id"))
|
|
}
|
|
configuration[cmd.Id] = &cmd
|
|
return nil
|
|
|
|
// if len(cmd.ReceiveCommandId) == 0 {
|
|
// return nil
|
|
// }
|
|
//
|
|
// return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd)
|
|
}
|
|
|
|
func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
|
|
|
|
if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
|
|
now := time.Now()
|
|
d.processResult.StartTime = &now
|
|
}
|
|
|
|
err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
|
|
|
|
if err != nil {
|
|
d.logger.Errorf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
|
|
d.processResult.Err = err
|
|
d.processResult.Stats.SentErrorCount.Inc()
|
|
return err
|
|
}
|
|
|
|
d.processResult.Stats.SentCount.Inc()
|
|
return nil
|
|
}
|
|
|
|
func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
|
|
|
|
if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
|
|
now := time.Now()
|
|
d.processResult.StartTime = &now
|
|
}
|
|
|
|
for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
|
|
|
|
if err != nil {
|
|
d.logger.Errorf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
|
|
d.processResult.Stats.SentErrorCount.Inc()
|
|
continue
|
|
}
|
|
|
|
d.processResult.Stats.SentCount.Inc()
|
|
time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func getReceiveRmrMessageType(receiveCommandId string) (string, error) {
|
|
command, ok := configuration[receiveCommandId]
|
|
|
|
if !ok {
|
|
return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId))
|
|
}
|
|
|
|
if len(command.RmrMessageType) == 0 {
|
|
return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId))
|
|
}
|
|
|
|
return command.RmrMessageType, nil
|
|
}
|
|
|
|
func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
|
|
|
|
defer sendAndReceiveWg.Done()
|
|
var listenAndHandleWg sync.WaitGroup
|
|
|
|
if len(command.ReceiveCommandId) > 0 {
|
|
rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId)
|
|
|
|
if err != nil {
|
|
d.processResult.Err = err
|
|
return
|
|
}
|
|
|
|
err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command)
|
|
|
|
if err != nil {
|
|
d.processResult.Err = err
|
|
return
|
|
}
|
|
|
|
listenAndHandleWg.Add(1)
|
|
go d.listenAndHandle(ctx, &listenAndHandleWg, command)
|
|
}
|
|
|
|
if command.RepeatCount == 0 {
|
|
err := d.sendNoRepeat(command)
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
} else {
|
|
d.sendWithRepeat(ctx, command)
|
|
}
|
|
|
|
if len(command.ReceiveCommandId) > 0 {
|
|
listenAndHandleWg.Wait()
|
|
}
|
|
}
|
|
|
|
func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
|
|
|
|
defer sendAndReceiveWg.Done()
|
|
|
|
err := addRmrMessageToWaitFor(command.RmrMessageType, command)
|
|
|
|
if err != nil {
|
|
d.processResult.Err = err
|
|
return
|
|
}
|
|
|
|
var listenAndHandleWg sync.WaitGroup
|
|
listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
|
|
d.listenAndHandle(ctx, &listenAndHandleWg, command)
|
|
}
|
|
|
|
func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
|
|
var command models.JsonCommand
|
|
if len(cmd.Id) == 0 {
|
|
return command, errors.New(fmt.Sprintf("invalid command, no id"))
|
|
}
|
|
|
|
command = *cmd
|
|
|
|
conf, ok := configuration[cmd.Id]
|
|
|
|
if ok {
|
|
command = *conf
|
|
mergeConfigurationAndCommand(&command, cmd)
|
|
}
|
|
|
|
return command, nil
|
|
}
|
|
|
|
func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCommand) {
|
|
|
|
command, err := getMergedCommand(cmd)
|
|
|
|
if err != nil {
|
|
d.processResult.Err = err
|
|
return
|
|
}
|
|
|
|
var sendAndReceiveWg sync.WaitGroup
|
|
|
|
commandAction := enums.CommandAction(command.Action)
|
|
|
|
switch commandAction {
|
|
|
|
case enums.SendRmrMessage:
|
|
sendAndReceiveWg.Add(1)
|
|
go d.sendHandler(ctx, &sendAndReceiveWg, command)
|
|
case enums.ReceiveRmrMessage:
|
|
sendAndReceiveWg.Add(1)
|
|
go d.receiveHandler(ctx, &sendAndReceiveWg, command)
|
|
default:
|
|
d.processResult = models.ProcessResult{Err: errors.New(fmt.Sprintf("invalid command action %s", command.Action))}
|
|
return
|
|
}
|
|
|
|
sendAndReceiveWg.Wait()
|
|
}
|
|
|
|
func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) {
|
|
responseCommand, ok := configuration[command.SendCommandId]
|
|
|
|
if !ok {
|
|
return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId))
|
|
}
|
|
|
|
return responseCommand, nil
|
|
}
|
|
|
|
func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
mbuf, err := d.rmrService.RecvMessage()
|
|
|
|
if err != nil {
|
|
d.logger.Errorf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
|
|
d.processResult.Err = err
|
|
d.processResult.Stats.ReceivedErrorCount.Inc()
|
|
return
|
|
}
|
|
|
|
if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
|
|
now := time.Now()
|
|
d.processResult.StartTime = &now
|
|
}
|
|
|
|
messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
|
|
|
|
_, ok := waitForRmrMessageType[mbuf.MType]
|
|
|
|
if !ok {
|
|
d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
|
|
d.processResult.Stats.ReceivedUnexpectedCount.Inc()
|
|
continue
|
|
}
|
|
|
|
d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
|
|
d.processResult.Stats.ReceivedExpectedCount.Inc()
|
|
|
|
if len(command.SendCommandId) > 0 {
|
|
responseCommand, err := getResponseCommand(command)
|
|
|
|
if err != nil {
|
|
d.processResult.Err = err
|
|
return
|
|
}
|
|
|
|
_ = d.sendNoRepeat(*responseCommand)
|
|
}
|
|
|
|
return
|
|
}
|
|
}
|
|
|
|
func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
|
|
|
|
var responseCommand *models.JsonCommand
|
|
|
|
if len(command.SendCommandId) > 0 {
|
|
var err error
|
|
responseCommand, err = getResponseCommand(command)
|
|
|
|
if err != nil {
|
|
d.processResult.Err = err
|
|
return
|
|
}
|
|
}
|
|
|
|
for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
mbuf, err := d.rmrService.RecvMessage()
|
|
|
|
if err != nil {
|
|
d.logger.Errorf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
|
|
d.processResult.Stats.ReceivedErrorCount.Inc()
|
|
continue
|
|
}
|
|
|
|
if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
|
|
now := time.Now()
|
|
d.processResult.StartTime = &now
|
|
}
|
|
|
|
messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
|
|
|
|
_, ok := waitForRmrMessageType[mbuf.MType]
|
|
|
|
if !ok {
|
|
d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
|
|
d.processResult.Stats.ReceivedUnexpectedCount.Inc()
|
|
continue
|
|
}
|
|
|
|
d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
|
|
d.processResult.Stats.ReceivedExpectedCount.Inc()
|
|
|
|
if responseCommand != nil {
|
|
_ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) {
|
|
|
|
defer listenAndHandleWg.Done()
|
|
|
|
if command.RepeatCount == 0 {
|
|
d.listenAndHandleNoRepeat(ctx, command)
|
|
return
|
|
}
|
|
|
|
d.listenAndHandleWithRepeat(ctx, command)
|
|
}
|
|
|
|
func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {
|
|
nFields := reflect.Indirect(reflect.ValueOf(cmd)).NumField()
|
|
|
|
for i := 0; i < nFields; i++ {
|
|
if fieldValue := reflect.Indirect(reflect.ValueOf(cmd)).Field(i); fieldValue.IsValid() {
|
|
switch fieldValue.Kind() {
|
|
case reflect.String:
|
|
if fieldValue.Len() > 0 {
|
|
reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
|
|
}
|
|
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
|
|
if fieldValue.Int() != 0 {
|
|
reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
|
|
}
|
|
case reflect.Bool:
|
|
if fieldValue.Bool() {
|
|
reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
|
|
}
|
|
case reflect.Float64, reflect.Float32:
|
|
if fieldValue.Float() != 0 {
|
|
reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
|
|
}
|
|
default:
|
|
reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
|
|
}
|
|
}
|
|
}
|
|
}
|