AliRoot Core  v5-06-30 (35d6c57)
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
AliZMQManager.cxx
Go to the documentation of this file.
1 #include "AliZMQManager.h"
2 
3 #include <iostream>
4 #include <sstream>
5 #include <stdio.h>
6 #include <unistd.h>
7 #include <string.h>
8 #include <fstream>
9 
10 #include <TList.h>
11 #include <TStreamerInfo.h>
12 #include <TThread.h>
13 
14 #include "AliESDEvent.h"
15 #include "AliESDtrack.h"
16 #include "AliTrackPointArray.h"
17 #include "AliESDfriendTrack.h"
18 #include "AliExternalTrackParam.h"
19 #include "AliTrackerBase.h"
20 #include "AliTracker.h"
21 
22 #include "zmq.h"
23 
24 using namespace std;
25 
27 
33 {
34  //read config file
35  ifstream configFile (GetConfigFilePath());
36 
37  if (configFile.is_open())
38  {
39  string line;
40  int from,to;
41  while(configFile.good())
42  {
43  getline(configFile,line);
44  from = line.find("\"")+1;
45  to = line.find_last_of("\"");
46  if(line.find("STORAGE_SERVER=")==0)
47  {
48  fStorageServer=line.substr(from,to-from);
49  }
50  else if(line.find("EVENT_SERVER=")==0)
51  {
52  fEventServer=line.substr(from,to-from);
53  }
54  else if(line.find("STORAGE_SERVER_PORT=")==0)
55  {
56  fStorageServerPort=atoi(line.substr(from,to-from).c_str());
57  }
58  else if(line.find("EVENT_SERVER_PORT=")==0)
59  {
60  fEventServerPort=atoi(line.substr(from,to-from).c_str());
61  }
62  else if(line.find("STORAGE_CLIENT_PORT=")==0)
63  {
64  fStorageClientPort=atoi(line.substr(from,to-from).c_str());
65  }
66  else if(line.find("XML_SERVER_PORT=")==0)
67  {
68  fXmlServerPort=atoi(line.substr(from,to-from).c_str());
69  }
70  }
71  if(configFile.eof()){configFile.clear();}
72  configFile.close();
73  }
74  else{cout<<"EVENT MANAGER -- Unable to open config file"<<endl;}
75  for(int i=0;i<NUMBER_OF_SOCKETS;i++){fContexts[i] = zmq_ctx_new();}
76 }
77 
82 {
83  // close sockets and destroy contexts
84  for (int i=0; i<NUMBER_OF_SOCKETS; i++)
85  {
86  zmq_close(fSockets[i]);
87  zmq_ctx_destroy(fContexts[i]);
88  }
89 }
90 
95 {
96  TThread::Lock();
97  if(fManagerInstance==0)
98  {
99  cout<<"\n\nMANAGER -- creating new instance of ZMQ Manager\n\n"<<endl;
100  fManagerInstance = new AliZMQManager();
101  }
102  TThread::UnLock();
103  return fManagerInstance;
104 }
105 
112 void AliZMQManager::CreateSocket(storageSockets socket)
113 {
114  int timeout = 5000;
115  int linger = -1;
116 
117  switch (socket)
118  {
119  case SERVER_COMMUNICATION_REQ:
120  {
121  // server communication - REQ
122  fSockets[SERVER_COMMUNICATION_REQ] = zmq_socket(fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ);
123 
124  if(0 != zmq_setsockopt(fSockets[SERVER_COMMUNICATION_REQ],ZMQ_RCVTIMEO,&timeout,sizeof(int)))
125  {cout<<"MANAGER -- create socket 1 -- "<<zmq_strerror(zmq_errno())<<endl;}
126  if(0 != zmq_setsockopt(fSockets[SERVER_COMMUNICATION_REQ],ZMQ_LINGER,&linger,sizeof(int)))
127  {cout<<"MANAGER -- create socket 1a -- "<<zmq_strerror(zmq_errno())<<endl;}
128 
129  if(0 != zmq_connect(fSockets[SERVER_COMMUNICATION_REQ],Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort)))
130  {cout<<"MANAGER -- create socket 1b -- "<<zmq_strerror(zmq_errno())<<endl;}
131  break;
132  }
133  case SERVER_COMMUNICATION_REP:
134  {
135  // server communication - REP
136  fSockets[SERVER_COMMUNICATION_REP] = zmq_socket(fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP);
137 
138  if(0 != zmq_setsockopt(fSockets[SERVER_COMMUNICATION_REP],ZMQ_RCVTIMEO,&timeout,sizeof(int)))
139  {cout<<"MANAGER -- create socket 2 -- "<<zmq_strerror(zmq_errno())<<endl;}
140  if(0 != zmq_setsockopt(fSockets[SERVER_COMMUNICATION_REP],ZMQ_LINGER,&linger,sizeof(int)))
141  {cout<<"MANAGER -- create socket 2a -- "<<zmq_strerror(zmq_errno())<<endl;}
142 
143  if(0 != zmq_bind(fSockets[SERVER_COMMUNICATION_REP],Form("tcp://*:%d",fStorageServerPort)))
144  {cout<<"MANAGER -- create socket 2b -- "<<zmq_strerror(zmq_errno())<<endl;}
145  break;
146  }
147  case CLIENT_COMMUNICATION_REQ:
148  {
149  // client communication - REQ
150  fSockets[CLIENT_COMMUNICATION_REQ] = zmq_socket(fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ);
151 
152  if(0 != zmq_setsockopt(fSockets[CLIENT_COMMUNICATION_REQ],ZMQ_RCVTIMEO,&timeout,sizeof(int)))
153  {cout<<"MANAGER -- create socket 3 -- "<<zmq_strerror(zmq_errno())<<endl;}
154  if(0 != zmq_setsockopt(fSockets[CLIENT_COMMUNICATION_REQ],ZMQ_LINGER,&linger,sizeof(int)))
155  {cout<<"MANAGER -- create socket 3a -- "<<zmq_strerror(zmq_errno())<<endl;}
156 
157  if(0 != zmq_connect(fSockets[CLIENT_COMMUNICATION_REQ],Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort)))
158  {cout<<"MANAGER -- create socket 3b -- "<<zmq_strerror(zmq_errno())<<endl;}
159  break;
160  }
161  case CLIENT_COMMUNICATION_REP:
162  {
163  // client communication - REP
164  fSockets[CLIENT_COMMUNICATION_REP] = zmq_socket(fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP);
165 
166  if(0 != zmq_setsockopt(fSockets[CLIENT_COMMUNICATION_REP],ZMQ_RCVTIMEO,&timeout,sizeof(int)))
167  {cout<<"MANAGER -- create socket 4 -- "<<zmq_strerror(zmq_errno())<<endl;}
168  if(0 != zmq_setsockopt(fSockets[CLIENT_COMMUNICATION_REP],ZMQ_LINGER,&linger,sizeof(int)))
169  {cout<<"MANAGER -- create socket 4a -- "<<zmq_strerror(zmq_errno())<<endl;}
170 
171  if(0 != zmq_bind(fSockets[CLIENT_COMMUNICATION_REP],Form("tcp://*:%d",fStorageClientPort)))
172  {cout<<"MANAGER -- create socket 4b -- "<<zmq_strerror(zmq_errno())<<endl;}
173  break;
174  }
175  case EVENTS_SERVER_PUB:
176  {
177  // events publisher
178  fSockets[EVENTS_SERVER_PUB] = zmq_socket(fContexts[EVENTS_SERVER_PUB],ZMQ_PUB);
179  if(0 != zmq_bind(fSockets[EVENTS_SERVER_PUB],Form("tcp://*:%d",fEventServerPort)))
180  {cout<<"MANAGER -- create socket 5 -- "<<zmq_strerror(zmq_errno())<<endl;}
181 
182  break;
183  }
184  case EVENTS_SERVER_SUB:
185  {
186  // events subscriber
187  fSockets[EVENTS_SERVER_SUB] = zmq_socket(fContexts[EVENTS_SERVER_SUB],ZMQ_SUB);
188  if(0 != zmq_setsockopt(fSockets[EVENTS_SERVER_SUB],ZMQ_SUBSCRIBE,"",0))
189  {cout<<"MANAGER -- create socket 6b -- "<<zmq_strerror(zmq_errno())<<endl;}
190  if(0 != zmq_connect(fSockets[EVENTS_SERVER_SUB],Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort)))
191  {cout<<"MANAGER -- create socket 6c -- "<<zmq_strerror(zmq_errno())<<endl;}
192 
193  break;
194  }
195  case XML_PUB:
196  {
197  // xml publisher
198  fSockets[XML_PUB] = zmq_socket(fContexts[XML_PUB],ZMQ_PUB);
199  if(0 != zmq_bind(fSockets[XML_PUB],Form("tcp://*:%d",fXmlServerPort)))
200  {cout<<"MANAGER -- create socket 7 -- "<<zmq_strerror(zmq_errno())<<endl;}
201  break;
202  }
203  default:break;
204  }
205 }
206 
213 bool AliZMQManager::Send(vector<serverListStruct> list,storageSockets socket)
214 {
215  //send size of the struct first
216  int numberOfRecords = list.size();
217  cout<<"MANAGER -- sending vector with "<<numberOfRecords<<" records"<<endl;
218  zmq_msg_t buffer;
219 
220  if(!zmqInit(&buffer,sizeof(int))){return false;}
221  memcpy(zmq_msg_data(&buffer),&numberOfRecords,sizeof(int));
222 
223  if(!zmqSend(&buffer,fSockets[socket],0))
224  {
225  cout<<"MANAGER -- couldn't send list's size"<<endl;
226  zmq_msg_close(&buffer);
227  return false;
228  }
229  if(!zmqRecv(&buffer,fSockets[socket],0))
230  {
231  cout<<"MANAGER -- couldn't receive message inside Send vector call. This may cause serious problems!"<<endl;
232  zmq_msg_close(&buffer);
233  return false;
234  }
235  if(numberOfRecords==0)
236  {
237  cout<<"MANAGER -- list size = 0"<<endl;
238 // return false;
239  }
240  zmq_msg_close(&buffer);
241 
242  zmq_msg_t message;
243  if(!zmqInit(&message,sizeof(serverListStruct)*numberOfRecords)){return false;}
244  memcpy(zmq_msg_data(&message),reinterpret_cast<void*> (&list[0]), sizeof(serverListStruct)*numberOfRecords);
245 
246  if(!zmqSend(&message,fSockets[socket],0))
247  {
248  zmq_msg_close(&message);
249  return false;
250  }
251  zmq_msg_close(&message);
252  return true;
253 }
254 
263 bool AliZMQManager::Send(vector<string100> list,storageSockets socket)
264 {
265  cout<<"MANAGER -- sending vector of string100:"<<endl;
266  vector<string100>::iterator it;
267  for (it = list.begin(); it != list.end(); ++it)
268  {
269  cout<<(*it).data<<endl;
270  }
271 
272  //send size of the set first
273  int numberOfRecords = list.size();
274  cout<<"MANAGER -- sending set with "<<numberOfRecords<<" records"<<endl;
275  zmq_msg_t buffer;
276 
277  if(!zmqInit(&buffer,sizeof(int))){return false;}
278  memcpy(zmq_msg_data(&buffer),&numberOfRecords,sizeof(int));
279 
280  if(!zmqSend(&buffer,fSockets[socket],0))
281  {
282  cout<<"MANAGER -- couldn't send list's size"<<endl;
283  zmq_msg_close(&buffer);
284  return false;
285  }
286  if(!zmqRecv(&buffer,fSockets[socket],0))
287  {
288  cout<<"MANAGER -- couldn't receive message inside Send set call. This may cause serious problems!"<<endl;
289  zmq_msg_close(&buffer);
290  return false;
291  }
292  if(numberOfRecords==0)
293  {
294  cout<<"MANAGER -- list size = 0"<<endl;
295  // return false;
296  }
297  zmq_msg_close(&buffer);
298 
299  zmq_msg_t message;
300  if(!zmqInit(&message,sizeof(string100)*numberOfRecords)){return false;}
301 // void* tmpVector = (void*)malloc(sizeof(string30)*myVector.size());
302  memcpy(zmq_msg_data(&message),reinterpret_cast<void*> (&list[0]), sizeof(string100)*numberOfRecords);
303 
304 
305  if(!zmqSend(&message,fSockets[socket],0))
306  {
307  zmq_msg_close(&message);
308  return false;
309  }
310  zmq_msg_close(&message);
311  return true;
312 }
313 
328 bool AliZMQManager::Send(struct serverRequestStruct *request,storageSockets socket)
329 {
330  size_t sizeOfRequest = sizeof(struct serverRequestStruct);/*+sizeof(struct listRequestStruct)+sizeof(struct eventStruct);*/
331 
332  zmq_msg_t buffer;
333  if(!zmqInit(&buffer,sizeOfRequest)){return false;}
334  memcpy(zmq_msg_data(&buffer),request,sizeOfRequest);
335  if(!zmqSend(&buffer,fSockets[socket],0))
336  {
337  zmq_msg_close(&buffer);
338  return false;
339  }
340  zmq_msg_close(&buffer);
341  return true;
342 }
343 
358 bool AliZMQManager::Send(struct clientRequestStruct *request,storageSockets socket)
359 {
360  cout<<"MANAGER -- sending clientRequestStruct:"<<request->messageType<<"\t"<<endl;
361 
362  //put clientRequestStruct in buffer
363  zmq_msg_t buffer;
364  if(!zmqInit(&buffer,sizeof(struct clientRequestStruct))){return false;}
365  memcpy(zmq_msg_data(&buffer),request,sizeof(struct clientRequestStruct));
366 
367  //send buffer
368  if(!zmqSend(&buffer,fSockets[socket],0))
369  {
370  zmq_msg_close(&buffer);
371  return false;
372  }
373  zmq_msg_close(&buffer);
374  return true;
375 }
376 
383 bool AliZMQManager::Send(long message,storageSockets socket)
384 {
385  zmq_msg_t buffer;
386  if(!zmqInit(&buffer,sizeof(long))){return false;}
387  memcpy(zmq_msg_data(&buffer),&message,sizeof(long));
388  if(!zmqSend(&buffer,fSockets[socket],0))
389  {
390  zmq_msg_close(&buffer);
391  return false;
392  }
393 
394  zmq_msg_close(&buffer);
395  return true;
396 }
397 
404 bool AliZMQManager::Send(bool message,storageSockets socket)
405 {
406  zmq_msg_t buffer;
407  if(!zmqInit(&buffer,sizeof(bool))){return false;}
408  memcpy(zmq_msg_data(&buffer),&message,sizeof(bool));
409  if(!zmqSend(&buffer,fSockets[socket],0))
410  {
411  zmq_msg_close(&buffer);
412  return false;
413  }
414  zmq_msg_close(&buffer);
415  return true;
416 }
417 
424 bool AliZMQManager::Send(AliESDEvent *event, storageSockets socket)
425 {
426  if(!event){cout<<"MANAGER -- no event"<<endl;return false;}
427 
428  TMessage tmess(kMESS_OBJECT);
429  tmess.Reset();
430  tmess.WriteObject(event);
431  TMessage::EnableSchemaEvolutionForAll(kTRUE);
432 
433  int bufsize = tmess.BufferSize();
434  zmq_msg_t buffer;
435  if(!zmqInit(&buffer,bufsize)){return false;}
436  memcpy(zmq_msg_data(&buffer),tmess.Buffer(),bufsize);
437  if(!zmqSend(&buffer,fSockets[socket],0))
438  {
439  zmq_msg_close(&buffer);
440  return false;
441  }
442  zmq_msg_close(&buffer);
443  return true;
444 }
445 
456 bool AliZMQManager::SendAsXml(AliESDEvent *event,storageSockets socket)
457 {
458  // to be tested from online reconstruction !!
459 
460  cout<<"SENDING AS XML"<<endl;
461  stringstream bufferStream;
462  bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl;
463  bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl;
464 
465  for(int i=0;i<event->GetNumberOfTracks();i++)
466  {
467  AliESDtrack *track = event->GetTrack(i);
468  AliKalmanTrack *ITStrack = track->GetITStrack();
469  const AliTrackPointArray *array = track->GetTrackPointArray();
470 
471  bufferStream << "\t<track mass=\""<<track->GetMass()<<"\"";
472  // bufferStream << "\t<track esdpid=\""<<track->GetESDpid();
473  bufferStream << "\t pid=\""<<track->PID()<<"\"";
474  bufferStream << "\t energy=\""<<track->E()<<"\"";
475  bufferStream << "\t volumeID=\""<<array->GetVolumeID()<<"\">" <<endl;
476 
477  if(array)
478  {
479  const float *x = array->GetX();
480  const float *y = array->GetY();
481  const float *z = array->GetZ();
482  int n = array->GetNPoints();
483 
484  for(int j=0;j<n;j++)
485  {
486  bufferStream <<"\t\t<point volumeID=\""<<array->GetVolumeID()[j]<<"\">"<<endl;
487  bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl;
488  bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl;
489  bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl;
490  bufferStream <<"\t\t</point>"<<endl;
491  }
492  }
493  else cout<<"no array"<<endl;
494 
495  bufferStream << "\t</track>"<<endl;
496  }
497 
498  bufferStream << "</ESD>"<<endl;
499 
500  string bufferString = bufferStream.str();
501 
502  zmq_msg_t buffer;
503  if(!zmqInit(&buffer,bufferString.size())){return false;}
504  memcpy (zmq_msg_data(&buffer), bufferString.data(), bufferString.size());
505 
506  if(!zmqSend(&buffer,fSockets[socket],0))
507  {
508  zmq_msg_close(&buffer);
509  return false;
510  }
511  zmq_msg_close(&buffer);
512  return true;
513 }
514 
527 bool AliZMQManager::Get(vector<serverListStruct>* &result,storageSockets socket)
528 {
529  //get size of the incomming message
530  zmq_msg_t buffer;
531  zmqInit(&buffer);
532  if(!zmqRecv(&buffer,fSockets[socket],0))
533  {
534  cout<<"MANAGER -- couldn't receive number of records inside GetServerListVector."<<endl;
535  zmq_msg_close(&buffer);
536  return false;
537  }
538  int numberOfRecords;
539  memcpy(&numberOfRecords,&buffer,sizeof(int));
540  cout<<"MANAGER -- number of records:"<<numberOfRecords<<endl;
541  //send empty message just to keep req-rep order:
542  if(!zmqSend(&buffer,fSockets[socket],0))
543  {
544  cout<<"MANAGER -- couldn't send message inside GetServerListVector. This may cause seriouse problems!"<<endl;
545  zmq_msg_close(&buffer);
546  return false;
547  }
548 
549  //get list of events
550  if(!zmqRecv(&buffer,fSockets[socket],0))
551  {
552  zmq_msg_close(&buffer);
553  return false;
554  }
555 
556  if(numberOfRecords==0){
557  cout<<"MANAGER -- list is empty"<<endl;
558  result = nullptr;
559  zmq_msg_close(&buffer);
560  return false;
561  }
562  else
563  {
564  // read data from buffer:
565  void *tmp = zmq_msg_data(&buffer);
566 
567  // vector's range constructor rebuilding vector from void*
568  vector<serverListStruct> tmpVector(static_cast<serverListStruct*>(tmp),
569  static_cast<serverListStruct*>(tmp)+numberOfRecords);
570 
571  cout<<"MANAGER -- size of vector:"<<tmpVector.size()<<endl;
572 
573  // create pointer to this vector:
574  result = new vector<serverListStruct>(tmpVector);
575 
576  cout<<"MANAGER -- size of vector (from pointer:)"<<result->size()<<endl;
577 
578  //delete tmpVector; ?
579  zmq_msg_close(&buffer);
580  return true;
581  }
582 }
583 
596 bool AliZMQManager::Get(vector<string100>* &result,storageSockets socket)
597 {
598  //get size of the incomming message
599  zmq_msg_t buffer;
600  zmqInit(&buffer);
601  if(!zmqRecv(&buffer,fSockets[socket],0))
602  {
603  cout<<"MANAGER -- couldn't receive number of records inside Get set<string>."<<endl;
604  zmq_msg_close(&buffer);
605  return false;
606  }
607  int numberOfRecords;
608  memcpy(&numberOfRecords,&buffer,sizeof(int));
609  cout<<"MANAGER -- number of records:"<<numberOfRecords<<endl;
610  //send empty message just to keep req-rep order:
611  if(!zmqSend(&buffer,fSockets[socket],0))
612  {
613  cout<<"MANAGER -- couldn't send message inside Get set<string>. This may cause seriouse problems!"<<endl;
614  zmq_msg_close(&buffer);
615  return false;
616  }
617 
618  //get list of events
619  if(!zmqRecv(&buffer,fSockets[socket],0))
620  {
621  zmq_msg_close(&buffer);
622  return false;
623  }
624 
625  if(numberOfRecords==0){
626  cout<<"MANAGER -- list is empty"<<endl;
627  result = nullptr;
628  zmq_msg_close(&buffer);
629  return false;
630  }
631  else
632  {
633  // read data from buffer:
634  void* tmp = zmq_msg_data(&buffer);
635 
636  // vector's range constructor rebuilding vector from void*
637  vector<string100> newVector(static_cast<string100*>(tmp),
638  static_cast<string100*>(tmp)+numberOfRecords);
639 
640  cout<<"MANAGER -- size of vector:"<<newVector.size()<<endl;
641 
642  cout<<"MANAGER -- received vector:"<<endl;
643  vector<string100>::iterator it;
644  for (it = newVector.begin(); it != newVector.end(); ++it)
645  {
646  cout<<(*it).data<<endl;
647  }
648 
649  // create pointer to this vector:
650  result = new vector<string100>(newVector);
651  cout<<"MANAGER -- size of vector (from pointer:)"<<result->size()<<endl;
652 
653  //delete tmpSet; ?
654  zmq_msg_close(&buffer);
655  return true;
656  }
657 }
658 
670 bool AliZMQManager::Get(AliESDEvent* &result, storageSockets socket)
671 {
672  //reveive buffer
673  zmq_msg_t buffer;
674  if(!zmqInit(&buffer)){return false;}
675  if(!zmqRecv(&buffer,fSockets[socket],0))
676  {
677  zmq_msg_close(&buffer);
678  return false;
679  }
680 
681  //read buffer to TMessage
682  TBufferFile *mess = new TBufferFile(TBuffer::kRead,
683  zmq_msg_size(&buffer)+sizeof(UInt_t),
684  zmq_msg_data(&buffer));
685  mess->InitMap();
686  mess->ReadClass();// get first the class stored in message
687  mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
688  mess->ResetMap();
689 
690  //read ESDEvent from TMessage
691  AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
692  if (data)
693  {
694  cout<<"MANAGER -- received valid event"<<endl;
695  data->GetStdContent();
696  cout<<"MANAGER -- reading std content:"<<data->GetEventNumberInFile()<<endl;
697  zmq_msg_close(&buffer);
698  result = data;
699  return true;
700  }
701  else
702  {
703  zmq_msg_close(&buffer);
704  result=0;
705  return false;
706  }
707 }
708 
720 bool AliZMQManager::Get(struct serverRequestStruct* &result, storageSockets socket)
721 {
722  zmq_msg_t buffer;
723  if(!zmqInit(&buffer)){return false;}
724  if(!zmqRecv(&buffer,fSockets[socket],0))
725  {
726  zmq_msg_close(&buffer);
727  return false;
728  }
729  else
730  {
731  result = new struct serverRequestStruct(*(static_cast<struct serverRequestStruct*>(zmq_msg_data(&buffer))));
732 
733  cout<<"MANAGER -- received server request:"<<result->messageType<<"\t"<<result->runNumber[0]<<endl;
734  zmq_msg_close(&buffer);
735  return true;
736  }
737 }
738 
750 bool AliZMQManager::Get(struct clientRequestStruct* &result, storageSockets socket)
751 {
752  zmq_msg_t buffer;
753  if(!zmqInit(&buffer)){return false;}
754  if(!zmqRecv(&buffer,fSockets[socket],0))
755  {
756  zmq_msg_close(&buffer);
757  return false;
758  }
759  else
760  {
761  result = new struct clientRequestStruct(*(static_cast<struct clientRequestStruct*>(zmq_msg_data(&buffer))));
762  cout<<"MANAGER -- received client request:"<<result->messageType<<endl;
763  zmq_msg_close(&buffer);
764  return true;
765  }
766 }
767 
774 bool AliZMQManager::Get(long *result, storageSockets socket)
775 {
776  zmq_msg_t buffer;
777  if(!zmqInit(&buffer)){return false;}
778  if(!zmqRecv(&buffer,fSockets[socket],0))
779  {
780  zmq_msg_close(&buffer);
781  return false;
782  }
783  else
784  {
785  memcpy(result,&buffer,sizeof(bool));
786  zmq_msg_close(&buffer);
787  return true;
788  }
789 }
790 
797 bool AliZMQManager::Get(bool *result, storageSockets socket)
798 {
799  zmq_msg_t buffer;
800 
801  if(!zmqInit(&buffer)){return false;}
802  if(!zmqRecv(&buffer,fSockets[socket],0))
803  {
804  zmq_msg_close(&buffer);
805  return false;
806  }
807  else
808  {
809  memcpy(result,&buffer,sizeof(long));
810  zmq_msg_close(&buffer);
811  return true;
812  }
813 }
814 
822 void AliZMQManager::RecreateSocket(storageSockets socket)
823 {
824  cout<<"MANAGER -- recreating socket:"<<socket<<endl;
825  zmq_close(fSockets[socket]);
826  CreateSocket(socket);
827 }
828 
829 // ZMQ methods wrappers:
830 bool AliZMQManager::zmqInit(zmq_msg_t *msg,size_t size)
831 {
832  if(size==0){
833  if(zmq_msg_init(msg) != 0){
834  if(zmq_errno() != EAGAIN){
835  cout<<"MANAGER -- "<<zmq_strerror(zmq_errno())<<endl;
836  return false;
837  }
838  }
839  }
840  else{
841  if(zmq_msg_init_size(msg,size) != 0){
842  if(zmq_errno() != EAGAIN){
843  cout<<"MANAGER -- "<<zmq_strerror(zmq_errno())<<endl;
844  return false;
845  }
846  }
847  }
848  return true;
849 }
850 
851 bool AliZMQManager::zmqSend(zmq_msg_t *msg,void *socket,int flags)
852 {
853  if(zmq_msg_send(msg,socket,flags) == -1){
854  if(zmq_errno() != EAGAIN) // ignore timeout problems
855  {
856  if(zmq_errno() == EFSM)// cannot be accomplished in current state
857  {}
858  cout<<"MANAGER -- zmqSend -- "<<zmq_strerror(zmq_errno())<<endl;
859  return false;
860  }
861  }
862  return true;
863 }
864 
865 bool AliZMQManager::zmqRecv(zmq_msg_t *msg,void *socket,int flags)
866 {
867  if(zmq_msg_recv(msg,socket,flags) == -1){
868  cout<<"MANAGER -- zmqRecv -- "<<zmq_strerror(zmq_errno())<<endl;
869  return false;
870  }
871  return true;
872 }
873 
874 
875 
static AliZMQManager * GetInstance()
AliTPCfastTrack * track
TObjArray * array
Definition: AnalyzeLaser.C:12
static AliZMQManager * fManagerInstance
single instance of AliZMQManager
Definition: AliZMQManager.h:86
ZMQ communication manager.
Definition: AliZMQManager.h:57
void CreateSocket(storageSockets socket)
bool Send(std::vector< serverListStruct > list, storageSockets socket)
bool zmqRecv(zmq_msg_t *msg, void *socket, int flags)
void RecreateSocket(storageSockets socket)
bool zmqInit(zmq_msg_t *msg, size_t size=0)
bool zmqSend(zmq_msg_t *msg, void *socket, int flags)
bool SendAsXml(AliESDEvent *event, storageSockets socket)
bool Get(std::vector< serverListStruct > *&result, storageSockets socket)