First commit

This commit is contained in:
Leonardo Bonati
2021-12-08 20:17:46 +00:00
commit 60dffad583
2923 changed files with 463894 additions and 0 deletions

View File

@@ -0,0 +1,49 @@
/*
==================================================================================
Copyright (c) 2019-2020 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==================================================================================
*/
/*
* a1_policy.hpp
*
* Created on: Mar, 2020
* Author: Shraboni Jana
*/
#ifndef SRC_XAPP_MGMT_A1MSG_A1_POLICY_HELPER_HPP_
#define SRC_XAPP_MGMT_A1MSG_A1_POLICY_HELPER_HPP_
#include <rapidjson/document.h>
#include <rapidjson/writer.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/schema.h>
using namespace rapidjson;
typedef struct a1_policy_helper a1_policy_helper;
struct a1_policy_helper{
std::string operation;
std::string policy_type_id;
std::string policy_instance_id;
std::string handler_id;
std::string status;
};
#endif /* SRC_XAPP_FORMATS_A1MSG_A1_POLICY_HELPER_HPP_ */

View File

@@ -0,0 +1,49 @@
/*
==================================================================================
Copyright (c) 2018-2019 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==================================================================================
*/
/*
* a1_policy.hpp
*
* Created on: Mar, 2020
* Author: Shraboni Jana
*/
#ifndef SRC_XAPP_MGMT_A1MSG_A1_POLICY_HELPER_HPP_
#define SRC_XAPP_MGMT_A1MSG_A1_POLICY_HELPER_HPP_
#include <rapidjson/document.h>
#include <rapidjson/writer.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/schema.h>
using namespace rapidjson;
typedef struct a1_policy_helper a1_policy_helper;
struct a1_policy_helper{
std::string operation;
std::string policy_type_id;
std::string policy_instance_id;
std::string handler_id;
std::string status;
};
#endif /* SRC_XAPP_FORMATS_A1MSG_A1_POLICY_HELPER_HPP_ */

View File

@@ -0,0 +1,293 @@
/*
==================================================================================
Copyright (c) 2019-2020 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==================================================================================
*/
/*
* msgs_proc.cc
* Created on: 2019
* Author: Ashwin Shridharan, Shraboni Jana
*/
#include "msgs_proc.hpp"
#include <stdio.h>
bool XappMsgHandler::encode_subscription_delete_request(unsigned char* buffer, size_t *buf_len){
subscription_helper sub_helper;
sub_helper.set_request(0); // requirement of subscription manager ... ?
sub_helper.set_function_id(0);
subscription_delete e2ap_sub_req_del;
// generate the delete request pdu
bool res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], buf_len, sub_helper);
if(! res){
mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str());
return false;
}
return true;
}
bool XappMsgHandler::decode_subscription_response(unsigned char* data_buf, size_t data_size){
subscription_helper subhelper;
subscription_response subresponse;
bool res = true;
E2AP_PDU_t *e2pdu = 0;
asn_dec_rval_t rval;
ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, e2pdu);
rval = asn_decode(0,ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void**)&e2pdu, data_buf, data_size);
switch(rval.code)
{
case RC_OK:
//Put in Subscription Response Object.
//asn_fprint(stdout, &asn_DEF_E2AP_PDU, e2pdu);
break;
case RC_WMORE:
mdclog_write(MDCLOG_ERR, "RC_WMORE");
res = false;
break;
case RC_FAIL:
mdclog_write(MDCLOG_ERR, "RC_FAIL");
res = false;
break;
default:
break;
}
ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2pdu);
return res;
}
bool XappMsgHandler::a1_policy_handler(char * message, int *message_len, a1_policy_helper &helper){
rapidjson::Document doc;
if (doc.Parse(message).HasParseError()){
mdclog_write(MDCLOG_ERR, "Error: %s, %d :: Could not decode A1 JSON message %s\n", __FILE__, __LINE__, message);
return false;
}
//Extract Operation
rapidjson::Pointer temp1("/operation");
rapidjson::Value * ref1 = temp1.Get(doc);
if (ref1 == NULL){
mdclog_write(MDCLOG_ERR, "Error : %s, %d:: Could not extract policy type id from %s\n", __FILE__, __LINE__, message);
return false;
}
helper.operation = ref1->GetString();
// Extract policy id type
rapidjson::Pointer temp2("/policy_type_id");
rapidjson::Value * ref2 = temp2.Get(doc);
if (ref2 == NULL){
mdclog_write(MDCLOG_ERR, "Error : %s, %d:: Could not extract policy type id from %s\n", __FILE__, __LINE__, message);
return false;
}
//helper.policy_type_id = ref2->GetString();
helper.policy_type_id = to_string(ref2->GetInt());
// Extract policy instance id
rapidjson::Pointer temp("/policy_instance_id");
rapidjson::Value * ref = temp.Get(doc);
if (ref == NULL){
mdclog_write(MDCLOG_ERR, "Error : %s, %d:: Could not extract policy type id from %s\n", __FILE__, __LINE__, message);
return false;
}
helper.policy_instance_id = ref->GetString();
if (helper.policy_type_id == "1" && helper.operation == "CREATE"){
helper.status = "OK";
Document::AllocatorType& alloc = doc.GetAllocator();
Value handler_id;
handler_id.SetString(helper.handler_id.c_str(), helper.handler_id.length(), alloc);
Value status;
status.SetString(helper.status.c_str(), helper.status.length(), alloc);
doc.AddMember("handler_id", handler_id, alloc);
doc.AddMember("status",status, alloc);
doc.RemoveMember("operation");
StringBuffer buffer;
Writer<StringBuffer> writer(buffer);
doc.Accept(writer);
strncpy(message,buffer.GetString(), buffer.GetLength());
*message_len = buffer.GetLength();
return true;
}
return false;
}
//For processing received messages.XappMsgHandler should mention if resend is required or not.
void XappMsgHandler::operator()(rmr_mbuf_t *message, bool *resend){
if (message->len > MAX_RMR_RECV_SIZE){
mdclog_write(MDCLOG_ERR, "Error : %s, %d, RMR message larger than %d. Ignoring ...", __FILE__, __LINE__, MAX_RMR_RECV_SIZE);
return;
}
a1_policy_helper helper;
bool res=false;
switch(message->mtype){
//need to fix the health check.
case (RIC_HEALTH_CHECK_REQ):
message->mtype = RIC_HEALTH_CHECK_RESP; // if we're here we are running and all is ok
message->sub_id = -1;
strncpy( (char*)message->payload, "HELLOWORLD OK\n", rmr_payload_size( message) );
*resend = true;
break;
case (RIC_INDICATION): {
mdclog_write(MDCLOG_INFO, "Received RIC indication message of type = %d", message->mtype);
unsigned char *me_id_null;
unsigned char *me_id = rmr_get_meid(message, me_id_null);
mdclog_write(MDCLOG_INFO,"RMR Received MEID: %s",me_id);
process_ric_indication(message->mtype, me_id, message->payload, message->len);
break;
}
case (RIC_SUB_RESP): {
mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype);
unsigned char *me_id_null;
unsigned char *me_id = rmr_get_meid(message, me_id_null);
mdclog_write(MDCLOG_INFO,"RMR Received MEID: %s",me_id);
if(_ref_sub_handler !=NULL){
_ref_sub_handler->manage_subscription_response(message->mtype, me_id, message->payload, message->len);
} else {
mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
}
*resend = false;
break;
}
case A1_POLICY_REQ:
mdclog_write(MDCLOG_INFO, "In Message Handler: Received A1_POLICY_REQ.");
helper.handler_id = xapp_id;
res = a1_policy_handler((char*)message->payload, &message->len, helper);
if(res){
message->mtype = A1_POLICY_RESP; // if we're here we are running and all is ok
message->sub_id = -1;
*resend = true;
}
break;
default:
{
mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype);
*resend = false;
}
}
return;
};
void process_ric_indication(int message_type, transaction_identifier id, const void *message_payload, size_t message_len) {
std::cout << "In Process RIC indication" << std::endl;
std::cout << "ID " << id << std::endl;
// decode received message payload
E2AP_PDU_t *pdu = nullptr;
auto retval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu, message_payload, message_len);
// print decoded payload
if (retval.code == RC_OK) {
char *printBuffer;
size_t size;
FILE *stream = open_memstream(&printBuffer, &size);
asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
mdclog_write(MDCLOG_DEBUG, "Decoded E2AP PDU: %s", printBuffer);
uint8_t res = procRicIndication(pdu, id);
}
else {
std::cout << "process_ric_indication, retval.code " << retval.code << std::endl;
}
}
/**
* Handle RIC indication
* TODO doxy
*/
uint8_t procRicIndication(E2AP_PDU_t *e2apMsg, transaction_identifier gnb_id)
{
uint8_t idx;
uint8_t ied;
uint8_t ret = RC_OK;
uint32_t recvBufLen;
RICindication_t *ricIndication;
RICaction_ToBeSetup_ItemIEs_t *actionItem;
printf("\nE2AP : RIC Indication received");
ricIndication = &e2apMsg->choice.initiatingMessage->value.choice.RICindication;
printf("protocolIEs elements %d\n", ricIndication->protocolIEs.list.count);
for (idx = 0; idx < ricIndication->protocolIEs.list.count; idx++)
{
switch(ricIndication->protocolIEs.list.array[idx]->id)
{
case 28: // RIC indication type
{
long ricindicationType = ricIndication->protocolIEs.list.array[idx]-> \
value.choice.RICindicationType;
printf("ricindicationType %ld\n", ricindicationType);
break;
}
case 26: // RIC indication message
{
int payload_size = ricIndication->protocolIEs.list.array[idx]-> \
value.choice.RICindicationMessage.size;
char* payload = (char*) calloc(payload_size, sizeof(char));
memcpy(payload, ricIndication->protocolIEs.list.array[idx]-> \
value.choice.RICindicationMessage.buf, payload_size);
printf("Payload %s\n", payload);
// send payload to agent
std::string agent_ip = find_agent_ip_from_gnb(gnb_id);
send_socket(payload, agent_ip);
break;
}
}
}
return ret; // TODO update ret value in case of errors
}

View File

@@ -0,0 +1,74 @@
/*
==================================================================================
Copyright (c) 2019-2020 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==================================================================================
*/
/*
* msgs_proc.hpp
* Created on: 2019
* Author: Ashwin Shridharan, Shraboni Jana
*/
#pragma once
#ifndef XAPP_MSG_XAPP_MSG_HPP_
#define XAPP_MSG_XAPP_MSG_HPP_
#include <iostream>
#include<rmr/rmr.h>
#include <rmr/RIC_message_types.h>
#include <mdclog/mdclog.h>
#include "a1_helper.hpp"
#include "e2ap_control.hpp"
#include "e2ap_control_response.hpp"
#include "e2ap_indication.hpp"
#include "subscription_delete_request.hpp"
#include "subscription_delete_response.hpp"
#include "subscription_helper.hpp"
#include "subscription_request.hpp"
#include "subscription_request.hpp"
#include "subscription_response.hpp"
#include "e2sm_subscription.hpp"
#include "subs_mgmt.hpp"
#include "../agent_connector.hpp"
#define MAX_RMR_RECV_SIZE 2<<15
class XappMsgHandler{
private:
std::string xapp_id;
SubscriptionHandler *_ref_sub_handler;
public:
//constructor for xapp_id.
XappMsgHandler(std::string xid){xapp_id=xid; _ref_sub_handler=NULL;};
XappMsgHandler(std::string xid, SubscriptionHandler &subhandler){xapp_id=xid; _ref_sub_handler=&subhandler;};
void operator() (rmr_mbuf_t *, bool*);
void register_handler();
bool encode_subscription_delete_request(unsigned char*, size_t* );
bool decode_subscription_response(unsigned char*, size_t );
bool a1_policy_handler(char *, int* , a1_policy_helper &);
void testfunction() {std::cout << "<<<<<<<<<<<<<<<<<<IN TEST FUNCTION<<<<<<<<<<<<<<<" << std::endl;}
};
void process_ric_indication(int message_type, transaction_identifier id, const void *message_payload, size_t message_len);
uint8_t procRicIndication(E2AP_PDU_t *e2apMsg, transaction_identifier gnb_id);
#endif /* XAPP_MSG_XAPP_MSG_HPP_ */

View File

@@ -0,0 +1,177 @@
/*
==================================================================================
Copyright (c) 2019-2020 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==================================================================================
*/
/*
* subs_mgmt.cc
* Created on: 2019
* Author: Ashwin Shridharan, Shraboni Jana
*/
#include "subs_mgmt.hpp"
#include <errno.h>
#include <stdio.h>
#include <string.h>
SubscriptionHandler::SubscriptionHandler(unsigned int timeout_seconds):_time_out(std::chrono::seconds(timeout_seconds)){
_data_lock = std::make_unique<std::mutex>();
_cv = std::make_unique<std::condition_variable>();
};
void SubscriptionHandler::clear(void){
{
std::lock_guard<std::mutex> lock(*(_data_lock).get());
status_table.clear();
}
};
bool SubscriptionHandler::add_request_entry(transaction_identifier id, transaction_status status){
// add entry in hash table if it does not exist
//auto search = status_table.find(id);
//if(search != status_table.end()){
if(transaction_present(status_table, id)){
return false;
}
std::cout << "add_request_entry " << id << std::endl;
status_table[id] = status;
return true;
};
bool SubscriptionHandler::delete_request_entry(transaction_identifier id){
auto search = status_table.find(id);
if (!trans_table.empty()) {
auto search2 = trans_table.find(id);
if(search2 !=trans_table.end()){
trans_table.erase(search2);
}
}
if (search != status_table.end()){
status_table.erase(search);
mdclog_write(MDCLOG_INFO,"Entry for Transaction ID deleted: %s", id);
return true;
}
mdclog_write(MDCLOG_INFO,"Entry not found in SubscriptionHandler for Transaction ID: %d",id);
return false;
};
bool SubscriptionHandler::set_request_status(transaction_identifier id, transaction_status status){
// change status of a request only if it exists.
//auto search = status_table.find(id);
if(transaction_present(status_table, id)){
status_table[id] = status;
std::cout << "set_request_status " << id << " to status " << status << std::endl;
return true;
}
return false;
};
int const SubscriptionHandler::get_request_status(transaction_identifier id){
//auto search = status_table.find(id);
auto search = find_transaction(status_table, id);
std::cout << "get_request_status " << id << std::endl;
if (search == status_table.end()){
return -1;
}
return search->second;
}
bool SubscriptionHandler::is_request_entry(transaction_identifier id){
std::cout << "is_request_entry, looking for key: " << id << std::endl;
if (transaction_present(status_table, id)) {
std::cout << "Key found" << std::endl;
return true;
}
else {
std::cout << "Key NOT found" << std::endl;
return false;
}
}
// Handles subscription responses
void SubscriptionHandler::manage_subscription_response(int message_type, transaction_identifier id, const void *message_payload, size_t message_len){
bool res;
std::cout << "In Manage subscription" << std::endl;
// wake up all waiting users ...
if(is_request_entry(id)){
std::cout << "In Manage subscription, inside if loop" << std::endl;
std::cout << "Setting to request_success: " << request_success << std::endl;
set_request_status(id, request_success);
_cv.get()->notify_all();
}
// decode received message payload
E2AP_PDU_t *pdu = nullptr;
auto retval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu, message_payload, message_len);
// print decoded payload
if (retval.code == RC_OK) {
char *printBuffer;
size_t size;
FILE *stream = open_memstream(&printBuffer, &size);
asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
mdclog_write(MDCLOG_DEBUG, "Decoded E2AP PDU: %s", printBuffer);
}
}
std::unordered_map<transaction_identifier, transaction_status>::iterator find_transaction(std::unordered_map<transaction_identifier, transaction_status> map,
transaction_identifier id) {
auto iter = map.begin();
while (iter != map.end()) {
if (strcmp((const char*) iter->first, (const char*) id) == 0) {
break;
}
++iter;
}
return iter;
}
bool transaction_present(std::unordered_map<transaction_identifier, transaction_status> map, transaction_identifier id) {
auto iter = map.begin();
while (iter != map.end()) {
if (strcmp((const char*) iter->first, (const char*) id) == 0) {
return true;
}
++iter;
}
return false;
}

View File

@@ -0,0 +1,269 @@
/*
==================================================================================
Copyright (c) 2019-2020 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==================================================================================
*/
/*
* subs_mgmt.hpp
* Created on: 2019
* Author: Ashwin Shridharan, Shraboni Jana
*/
#pragma once
#ifndef SUBSCRIPTION_HANDLER
#define SUBSCRIPTION_HANDLER
#include <functional>
#include <mdclog/mdclog.h>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include <chrono>
#include <tuple>
#include "../xapp-asn/e2ap/subscription_delete_request.hpp"
#include "../xapp-asn/e2ap/subscription_delete_response.hpp"
#include "../xapp-asn/e2ap/subscription_request.hpp"
#include "../xapp-asn/e2ap/subscription_response.hpp"
#define SUBSCR_SUCCESS 1
#define SUBSCR_ERR_TX -1
#define SUBSCR_ERR_TIMEOUT -2
#define SUBSCR_ERR_FAIL -3
#define SUBSCR_ERR_UNKNOWN -4
#define SUBSCR_ERR_DUPLICATE -5
using namespace std;
class TransmitterBase
{
public:
virtual ~TransmitterBase() {}
template<class T>
const T& getParam() const; //to be implemented after Parameter
template<class T, class U>
void setParam(const U& rhs); //to be implemented after Parameter
};
template <typename T>
class Transmitter : public TransmitterBase
{
public:
Transmitter(const T& tx) :obj(tx) {}
const T& getParam() const {return obj;}
void setParam(const T& tx) {obj=tx;}
private:
T obj;
};
//Here's the trick: dynamic_cast rather than virtual
template<class T> const T& TransmitterBase::getParam() const
{
return dynamic_cast<const Transmitter<T>&>(*this).getParam();
}
template<class T, class U> void TransmitterBase::setParam(const U& rhs)
{
dynamic_cast<Transmitter<T>&>(*this).setParam(rhs);
return;
}
typedef enum {
request_pending = 1,
request_success,
request_failed,
delete_request_pending,
delete_request_success,
delete_request_failed,
request_duplicate
}Subscription_Status_Types;
using transaction_identifier = unsigned char*;
using transaction_status = Subscription_Status_Types;
class SubscriptionHandler {
public:
SubscriptionHandler(unsigned int timeout_seconds = 10);
template <typename AppTransmitter>
int manage_subscription_request(transaction_identifier, AppTransmitter &&);
template <typename AppTransmitter>
int manage_subscription_delete_request(transaction_identifier, AppTransmitter &&);
void manage_subscription_response(int message_type, transaction_identifier id, const void *message_payload, size_t message_len);
int const get_request_status(transaction_identifier);
bool set_request_status(transaction_identifier, transaction_status);
bool is_request_entry(transaction_identifier);
void set_timeout(unsigned int);
void clear(void);
void set_ignore_subs_resp(bool b){_ignore_subs_resp = b;};
private:
bool add_request_entry(transaction_identifier, transaction_status);
bool delete_request_entry(transaction_identifier);
template <typename AppTransmitter>
bool add_transmitter_entry(transaction_identifier, AppTransmitter&&);
std::unordered_map<transaction_identifier, TransmitterBase> trans_table;
std::unordered_map<transaction_identifier, transaction_status> status_table;
std::unique_ptr<std::mutex> _data_lock;
std::unique_ptr<std::condition_variable> _cv;
std::chrono::seconds _time_out;
bool _ignore_subs_resp = false;
};
template <typename AppTransmitter>
bool SubscriptionHandler::add_transmitter_entry(transaction_identifier id, AppTransmitter &&trans){
// add entry in hash table if it does not exist
//auto search = trans_table.find(id);
auto search = trans_table.begin();
while (search != trans_table.end()) {
if (strcmp((const char*) search->first, (const char*) id) == 0) {
break;
}
++search;
}
if(search != trans_table.end()){
return false;
}
Transmitter<AppTransmitter> tptr(trans);
trans_table[id] = tptr;
return true;
};
//this will work for both sending subscription request and subscription delete request.
//The handler is oblivious of the message content and follows the transaction id.
template<typename AppTransmitter>
int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_trans_id, AppTransmitter && tx){
int res;
// put entry in request table
{
std::lock_guard<std::mutex> lock(*(_data_lock.get()));
res = add_request_entry(rmr_trans_id, request_pending);
if(! res){
mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s to queue because request with identical key already present", __FILE__, __LINE__, rmr_trans_id);
return SUBSCR_ERR_DUPLICATE;
}
else {
std::cout << "Added status pending to request " << rmr_trans_id << std::endl;
}}
// acquire lock ...
std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
// Send the message
bool flg = tx();
if (!flg){
// clear state
delete_request_entry(rmr_trans_id);
mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s", __FILE__, __LINE__, rmr_trans_id );
return SUBSCR_ERR_TX;
} else {
mdclog_write(MDCLOG_INFO, "%s, %d :: Transmitted subscription request for trans_id %s", __FILE__, __LINE__, rmr_trans_id );
add_transmitter_entry(rmr_trans_id, tx);
}
// record time stamp ..
auto start = std::chrono::system_clock::now();
res = SUBSCR_ERR_UNKNOWN;
while(1){
// release lock and wait to be woken up
_cv.get()->wait_for(_local_lock, _time_out);
// we have woken and acquired data_lock
// check status and return appropriate object
int status = get_request_status(rmr_trans_id);
if (status == request_success){
mdclog_write(MDCLOG_INFO, "Successfully subscribed for request for trans_id %s", rmr_trans_id);
res = SUBSCR_SUCCESS;
break;
}
if (status == request_pending){
// woken up spuriously or timed out
auto end = std::chrono::system_clock::now();
std::chrono::duration<double> f = end - start;
if ( f > _time_out){
mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request with transaction id %s timed out waiting for response ", __FILE__, __LINE__, rmr_trans_id);
//res = SUBSCR_ERR_TIMEOUT;
//sunny side scenario. assuming subscription response is received.
res = SUBSCR_SUCCESS;
break;
}
else{
mdclog_write(MDCLOG_INFO, "Subscription request with transaction id %s Waiting for response....", rmr_trans_id);
continue;
}
}
if(status == request_failed){
mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s got failure response .. \n", __FILE__, __LINE__, rmr_trans_id);
res = SUBSCR_ERR_FAIL;
break;
}
if (status == request_duplicate){
mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, rmr_trans_id);
res = SUBSCR_ERR_DUPLICATE;
break;
}
// if we are here, some spurious
// status obtained or request failed . we return appropriate error code
mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, and state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, rmr_trans_id, status);
res = SUBSCR_ERR_UNKNOWN;
break;
};
delete_request_entry(rmr_trans_id);
// release data lock
_local_lock.unlock();
// std::cout <<"Returning res = " << res << " for request = " << rmr_trans_id << std::endl;
return res;
};
std::unordered_map<transaction_identifier, transaction_status>::iterator find_transaction(std::unordered_map<transaction_identifier, transaction_status> map,
transaction_identifier id);
bool transaction_present(std::unordered_map<transaction_identifier, transaction_status> map, transaction_identifier id);
#endif

View File

@@ -0,0 +1,21 @@
/*
* xapp_handler.hpp
*
* Created on: Mar 16, 2020
* Author: Shraboni Jana
*/
#ifndef SRC_XAPP_MGMT_XAPP_HANDLER_HPP_
#define SRC_XAPP_MGMT_XAPP_HANDLER_HPP_
class XappHandler{
XappHandler *xhandler;
public:
virtual ~XappHandler(){delete xhandler;};
virtual void register_handler(XappHandler *xhandler) = 0;
virtual XappHandler* get_handler() = 0;
};
#endif /* SRC_XAPP_MGMT_XAPP_HANDLER_HPP_ */