Tutorials/i1Net/mgen: mgen_oml.2.patch
File mgen_oml.2.patch, 14.6 KB (added by , 7 years ago) |
---|
-
include/CWriteOml.h
diff -Naur mgen.orig/include/CWriteOml.h MGEN_SRC.oml/include/CWriteOml.h
old new 1 #ifndef _CWRITEOML_H_ 2 #define _CWRITEOML_H_ 3 4 #include <iostream> 5 #include <vector> 6 #include <map> 7 #include <string.h> 8 #include "oml2/omlc.h" 9 10 #include <boost/lexical_cast.hpp> 11 12 13 #define FALSE (unsigned int) 0 14 #define TRUE (unsigned int) 1 15 16 17 class CWriteOml 18 { 19 private: 20 unsigned int bReady; 21 unsigned int _MeasurementPoints; 22 std::string mHostName; 23 OmlValueU* _values; 24 25 std::map<std::string, std::pair<OmlValueT, OmlValueU*> > _KTVMap; 26 std::map<std::string, std::pair<OmlValueT, OmlValueU*> >::iterator _KTVMapIter; 27 28 29 OmlMPDef* mp_def; 30 OmlMP* _mp_handle; 31 std::string _db_filename; 32 std::string _server_name; 33 34 void createMeasurementPoint(OmlMPDef* pOmlMPDef, std::string str, OmlValueT type); 35 36 public: 37 CWriteOml(); 38 ~CWriteOml(); 39 40 void CWriteOML(std::string db_filename, std::string server_name); 41 void init(std::string db_filename, std::string server_name); 42 void start( std::vector< std::pair<std::string, OmlValueT> >& oml_key_list ); 43 void set_key(std::string key_str, void* val_ptr); 44 void insert(); 45 void stop(); 46 47 unsigned int isReady() { return bReady; } 48 49 }; 50 51 #endif -
include/mgen.h
diff -Naur mgen.orig/include/mgen.h MGEN_SRC.oml/include/mgen.h
old new 5 5 #include "mgenFlow.h" 6 6 #include "mgenGlobals.h" 7 7 #include "mgenMsg.h" 8 #include "CWriteOml.h" 9 10 11 #define DBG_OUT(x) std::cerr << "[" << __FILE__ << "] " << #x << " = " << x << std::endl; 8 12 9 13 class MgenController 10 14 { … … 104 108 class Mgen 105 109 { 106 110 public: 111 CWriteOml oml; 112 107 113 enum {SCRIPT_LINE_MAX = 8192}; // maximum script line length 108 114 109 115 Mgen(ProtoTimerMgr& timerMgr, -
makefiles/Makefile.common
diff -Naur mgen.orig/makefiles/Makefile.common MGEN_SRC.oml/makefiles/Makefile.common
old new 41 41 $(COMMON)/mgenTransport.cpp $(COMMON)/mgenPattern.cpp \ 42 42 $(COMMON)/mgenPayload.cpp \ 43 43 $(COMMON)/mgenSequencer.cpp \ 44 $(COMMON)/CWriteOml.cpp \ 44 45 $(COMMON)/gpsPub.cpp $(COMMON)/mgenAppSinkTransport.cpp 45 46 46 47 MGEN_OBJ = $(MGEN_SRC:.cpp=.o) -
makefiles/Makefile.linux
diff -Naur mgen.orig/makefiles/Makefile.linux MGEN_SRC.oml/makefiles/Makefile.linux
old new 7 7 # 8 8 SYSTEM_INCLUDES = -I/usr/X11R6/include 9 9 SYSTEM_LDFLAGS = -L/usr/X11R6/lib 10 SYSTEM_LIBS = -ldl -lpthread -lpcap 10 SYSTEM_LIBS = -ldl -lpthread -lpcap -loml2 -locomm 11 11 12 12 # 2) System specific capabilities 13 13 # Must choose appropriate for the following: -
src/common/CWriteOml.cpp
diff -Naur mgen.orig/src/common/CWriteOml.cpp MGEN_SRC.oml/src/common/CWriteOml.cpp
old new 1 2 #include "CWriteOml.h" 3 #include <iostream> 4 #include <vector> 5 #include <map> 6 #include <string.h> 7 #include "oml2/omlc.h" 8 9 //#include <boost/lexical_cast.hpp> 10 11 12 CWriteOml::CWriteOml() 13 { 14 } 15 16 void CWriteOml::CWriteOML(std::string db_filename, std::string server_name) 17 { 18 bReady = FALSE; 19 init(db_filename, server_name); 20 } 21 22 CWriteOml::~CWriteOml() 23 { 24 omlc_close(); 25 } 26 27 28 void CWriteOml::init(std::string db_filename, std::string server_name) 29 { 30 _db_filename = db_filename; 31 _server_name = server_name; 32 33 std::string fname; 34 int argc; 35 const char** argv; 36 std::vector<char*> arg_vector; 37 38 //char chostname[32]; 39 //for (int i = 0; i < 32;++i) 40 // chostname[i] = '\0'; 41 //gethostname(chostname, 31); 42 //mHostName = std::string(chostname); 43 44 std::string mode(server_name.c_str()); 45 46 if (mode == "file") 47 { 48 fname = db_filename;// + "_" + mHostName; 49 std::cout << fname << std::endl; 50 argc = 7; 51 const char* argv_file[] = {"./spectrum", "--oml-id",(const char*)"et", "--oml-exp-id",db_filename.c_str(), "--oml-file",fname.c_str()}; 52 argv = argv_file; 53 } 54 else 55 { 56 // Following two lines were being optimized out 57 //const char* argv_server[] = {"./spectrum", "--oml-id",(const char*)"et", "--oml-domain",db_filename.c_str(), "--oml-collect", server_name.c_str() }; 58 //argv = argv_server; 59 60 arg_vector.push_back((char*)"./spectrum"); 61 arg_vector.push_back((char*)"--oml-id"); 62 arg_vector.push_back((char*)"et"); 63 arg_vector.push_back((char*)"--oml-domain"); 64 arg_vector.push_back((char*)db_filename.c_str()); 65 arg_vector.push_back((char*)"--oml-collect"); 66 arg_vector.push_back((char*)server_name.c_str()); 67 argv = (const char**)&arg_vector[0]; 68 69 argc = arg_vector.size(); // argc = 7; 70 71 } 72 73 int result = omlc_init ("_mp_", &argc, argv, NULL); 74 if (result == -1) { 75 std::cerr << "Could not initialize OML\n"; 76 exit (1); 77 } 78 79 } 80 81 82 void CWriteOml::start( std::vector<std::pair<std::string, OmlValueT> >& _OmlKeys ) 83 { 84 int result; 85 86 _MeasurementPoints = _OmlKeys.size(); 87 88 mp_def = new OmlMPDef [(sizeof(OmlMPDef) * (_MeasurementPoints + 1) )]; 89 90 // define measurement points 91 unsigned int idx; 92 for (idx = 0; idx < _MeasurementPoints; ++idx) 93 createMeasurementPoint(&mp_def[idx], _OmlKeys.at(idx).first, (OmlValueT)_OmlKeys.at(idx).second); 94 createMeasurementPoint(&mp_def[idx], "NULL", (OmlValueT)0); 95 96 _mp_handle = omlc_add_mp (_db_filename.c_str(), mp_def); // using db_filename as tag name for measurement point 97 98 if (_mp_handle == NULL) { 99 std::cerr << "Error: could not register Measurement Point \"data\""; 100 exit (1); 101 } 102 103 result = omlc_start(); 104 if (result == -1) { 105 std::cerr << "Error starting up OML measurement streams\n"; 106 exit (1); 107 } 108 109 // allocate memory measurement points 110 _values = (OmlValueU*) malloc(sizeof(OmlValueU) * _MeasurementPoints); 111 memset((void*)_values, 0, sizeof(OmlValueU) * _MeasurementPoints ); 112 113 // create oml key <==> (type,value) mapping 114 _KTVMap.clear(); 115 for (unsigned int idx = 0; idx < _MeasurementPoints; ++idx) { 116 std::pair<OmlValueT, OmlValueU*> TV (_OmlKeys.at(idx).second, (OmlValueU*)&_values[idx] ); 117 118 std::pair<std::string, std::pair<OmlValueT, OmlValueU*> > KTV( _OmlKeys.at(idx).first, TV ); 119 _KTVMap.insert( KTV ); 120 } 121 122 bReady = TRUE; 123 } 124 125 126 void CWriteOml::set_key(std::string key_str, void* val_ptr) 127 { 128 129 _KTVMapIter = _KTVMap.find(key_str); 130 if (_KTVMapIter == _KTVMap.end()) { 131 std::cerr << key_str << " not found" << std::endl; 132 return; // key not found so return and do nothing 133 } 134 135 //key found to look at type are call appropriate oml intrinsic function 136 std::pair<OmlValueT, OmlValueU*> TV = _KTVMapIter->second; 137 switch( TV.first ) { 138 case OML_INT32_VALUE : 139 omlc_set_int32 ( *(TV.second), (int32_t) (*((int32_t*)val_ptr))); 140 break; 141 142 case OML_UINT32_VALUE : 143 omlc_set_uint32 ( *(TV.second), (uint32_t) (*((uint32_t*)val_ptr))); 144 break; 145 146 case OML_INT64_VALUE : 147 omlc_set_int64 ( *(TV.second), (int64_t) (*((int64_t*)val_ptr))); 148 break; 149 case OML_DOUBLE_VALUE : 150 omlc_set_double ( *(TV.second), (double) (*((double*)val_ptr))); 151 break; 152 case OML_STRING_VALUE : 153 omlc_set_string( *(TV.second), (char*)val_ptr); 154 break; 155 // add other cases here 156 default : 157 std::cerr << "OML - unrecognizeg type, value: " << TV.first << " , " << TV.second << std::endl; 158 break; 159 } 160 161 return; 162 } 163 164 void CWriteOml::createMeasurementPoint(OmlMPDef* pOmlMPDef, std::string str, OmlValueT type) 165 { 166 char* cptr; 167 if (str == "NULL") { 168 pOmlMPDef->name = NULL; 169 pOmlMPDef->param_types = type; 170 } 171 else { 172 cptr = new char[str.size()+1]; 173 strcpy (cptr, str.c_str()); 174 pOmlMPDef->name = cptr; 175 pOmlMPDef->param_types = type; 176 } 177 } 178 179 void CWriteOml::insert() 180 { 181 omlc_inject (_mp_handle, _values); 182 } 183 184 void CWriteOml::stop() 185 { 186 omlc_close(); 187 free(_values); 188 } -
src/common/mgenApp.cpp
diff -Naur mgen.orig/src/common/mgenApp.cpp MGEN_SRC.oml/src/common/mgenApp.cpp
old new 16 16 #include <unistd.h> 17 17 #include <fcntl.h> 18 18 #endif // UNIX 19 20 #include <boost/algorithm/string.hpp> 21 22 23 19 24 MgenApp::MgenApp() 20 25 : mgen(GetTimerMgr(), GetSocketNotifier()), 21 26 control_pipe(ProtoPipe::MESSAGE), control_remote(false), … … 56 61 " [queue <queueSize>][broadcast {on|off}]\n" 57 62 " [convert <binaryLog>][debug <debugLevel>]\n" 58 63 " [gpskey <gpsSharedMemoryLocation>]\n" 59 " [boost] [reuse {on|off}]\n"); 64 " [boost] [reuse {on|off}]\n" 65 " [oml <server:port[,omlFile]>]\n"); 66 60 67 } // end MgenApp::Usage() 61 68 62 69 … … 80 87 "+gpskey", // Override default gps shared memory location 81 88 "+logdata", // log optional data attribute? default ON 82 89 "+loggpsdata", // log gps data? default ON 90 "+oml", // oml: send data to this address:port 83 91 NULL 84 92 }; 85 93 … … 447 455 if (!dispatcher.BoostPriority()) 448 456 fprintf(stderr,"Unable to boost process priority.\n"); 449 457 } 458 else if (!strncmp("oml", lowerCmd, len)) 459 { 460 std::string temp( val ); 461 std::vector<std::string> sToken; 462 boost::split(sToken, temp, boost::is_any_of(",") ); 463 464 #if 1 465 466 std::string omlDbFilename("oml_mgen_test"); 467 std::string omlServerName; 468 469 omlServerName = sToken.at(0); 470 if (sToken.size() == 2) { 471 omlDbFilename = sToken.at(1); 472 } 473 474 mgen.oml.init(omlDbFilename, omlServerName ); 475 std::vector< std::pair<std::string, OmlValueT> > _omlKeys; // this is just used here to hold all identifiers and values in one container 476 // before passing to start(). 477 478 _omlKeys.push_back( std::make_pair("proto", OML_STRING_VALUE)); 479 _omlKeys.push_back( std::make_pair("flowid", OML_INT32_VALUE)); 480 _omlKeys.push_back( std::make_pair("seq_num", OML_INT32_VALUE)); 481 _omlKeys.push_back( std::make_pair("src_addr", OML_STRING_VALUE)); 482 _omlKeys.push_back( std::make_pair("src_port", OML_INT32_VALUE)); 483 _omlKeys.push_back( std::make_pair("dst_addr", OML_STRING_VALUE)); 484 _omlKeys.push_back( std::make_pair("dst_port", OML_INT32_VALUE)); 485 _omlKeys.push_back( std::make_pair("data_len", OML_INT32_VALUE)); 486 487 488 //_omlKeys.push_back( std::make_pair("a_double64", OML_DOUBLE_VALUE) ); 489 //_omlKeys.push_back( std::make_pair("a____int32", OML_INT32_VALUE)); 490 //_omlKeys.push_back( std::make_pair("a___string", OML_STRING_VALUE)); 491 //_omlKeys.push_back( std::make_pair("ExecTime", OML_DOUBLE_VALUE)); 492 mgen.oml.start( _omlKeys ); 493 #endif 494 } 450 495 else if (!strncmp("help", lowerCmd, len)) 451 496 { 452 497 fprintf(stderr, "mgen: version %s\n", MGEN_VERSION); -
src/common/mgenTransport.cpp
diff -Naur mgen.orig/src/common/mgenTransport.cpp MGEN_SRC.oml/src/common/mgenTransport.cpp
old new 305 305 { 306 306 case SEND_EVENT: 307 307 { 308 if (mgen.GetLogTx()) 309 { 310 theMsg->LogSendEvent(mgen.GetLogFile(), 311 mgen.GetLogBinary(), 312 mgen.GetLocalTime(), 313 buffer, 314 mgen.GetLogFlush(), 315 theTime); 316 } 317 break; 308 309 #if 1 310 if ( mgen.oml.isReady() == TRUE ) { 311 std::string protocol_string ( MgenEvent::GetStringFromProtocol(protocol) ); 312 unsigned int u32_flowid = theMsg->GetFlowId(); 313 unsigned int u32_seqnum = theMsg->GetSeqNum(); 314 std::string str_srcaddr = theMsg->GetSrcAddr().GetHostString(); 315 unsigned int u32_srcport = theMsg->GetSrcAddr().GetPort(); 316 std::string str_dstaddr = theMsg->GetDstAddr().GetHostString(); 317 unsigned int u32_dstport = theMsg->GetDstAddr().GetPort(); 318 unsigned int u32_datalen = theMsg->GetMgenMsgLen(); 319 320 mgen.oml.set_key("proto", (void*)protocol_string.c_str() ); 321 mgen.oml.set_key("flowid", (void*)&u32_flowid ); 322 mgen.oml.set_key("seq_num", (void*)&u32_seqnum ); 323 mgen.oml.set_key("src_addr", (void*)str_srcaddr.c_str() ); 324 mgen.oml.set_key("src_port", (void*)&u32_srcport ); 325 mgen.oml.set_key("dst_addr", (void*)str_dstaddr.c_str() ); 326 mgen.oml.set_key("dst_port", (void*)&u32_dstport ); 327 mgen.oml.set_key("data_len", (void*)&u32_datalen ); 328 mgen.oml.insert(); 329 } 330 #endif 331 332 if (mgen.GetLogTx()) 333 { 334 theMsg->LogSendEvent(mgen.GetLogFile(), 335 mgen.GetLogBinary(), 336 mgen.GetLocalTime(), 337 buffer, 338 mgen.GetLogFlush(), 339 theTime); 340 } 341 break; 318 342 } 319 343 case RECV_EVENT: 320 344 { … … 328 352 mgen.GetLogFlush(), 329 353 theTime); 330 354 355 #if 1 356 if ( mgen.oml.isReady() == TRUE ) { 357 std::string protocol_string ( MgenEvent::GetStringFromProtocol(protocol) ); 358 unsigned int u32_flowid = theMsg->GetFlowId(); 359 unsigned int u32_seqnum = theMsg->GetSeqNum(); 360 std::string str_srcaddr = theMsg->GetSrcAddr().GetHostString(); 361 unsigned int u32_srcport = theMsg->GetSrcAddr().GetPort(); 362 std::string str_dstaddr = theMsg->GetDstAddr().GetHostString(); 363 unsigned int u32_dstport = theMsg->GetDstAddr().GetPort(); 364 unsigned int u32_datalen = theMsg->GetMsgLen(); 365 366 mgen.oml.set_key("proto", (void*)protocol_string.c_str() ); 367 mgen.oml.set_key("flowid", (void*)&u32_flowid ); 368 mgen.oml.set_key("seq_num", (void*)&u32_seqnum ); 369 mgen.oml.set_key("src_addr", (void*)str_srcaddr.c_str() ); 370 mgen.oml.set_key("src_port", (void*)&u32_srcport ); 371 mgen.oml.set_key("dst_addr", (void*)str_dstaddr.c_str() ); 372 mgen.oml.set_key("dst_port", (void*)&u32_dstport ); 373 mgen.oml.set_key("data_len", (void*)&u32_datalen ); 374 mgen.oml.insert(); 375 } 376 #endif 377 378 331 379 // Don't we want rapr to get the message regardless of logging?? 332 380 // Could this possibly have been broken too? strange... ljt 333 381 if (mgen.GetController())