AliRoot Core  v5-06-30 (35d6c57)
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
AliCommunicationThread.cxx
Go to the documentation of this file.
2 #include "AliZMQManager.h"
3 
4 #include <iostream>
5 #include <fstream>
6 
7 using namespace std;
8 
10  fFinished(false),
11  fManager(onlineReconstructionManager),
12  fCommunicationThread(0)
13 {
14  //create two-way communication thread
15  fCommunicationThread = new TThread("fCommunicationThread",Dispatch,(void*)this);
16  fCommunicationThread->Run();
17 }
18 
20 {
22 }
23 
25 {
27  {
28  fFinished=true;
29  fCommunicationThread->Join();
30  fCommunicationThread->Kill();
31  }
32 }
33 
35 {
36  AliZMQManager *eventManager = AliZMQManager::GetInstance();
37  storageSockets socket = CLIENT_COMMUNICATION_REP;
38  eventManager->CreateSocket(socket);
39 
40  struct clientRequestStruct *request;
41  struct clientRequestStruct *response = new struct clientRequestStruct;
42 
43  cout<<"COMMUNICATION -- Communication stated"<<endl;
44 
45  // mutex mtx;
46  bool receiveStatus = false;
47  bool sendStatus = false;
48 
49  while(!fFinished)
50  {
51  cout<<"COMMUNICATION -- waiting for requests"<<endl;
52 
53  do { // try receive requests until success
54  receiveStatus = eventManager->Get(request,socket);
55 // sleep(1);
56  } while (receiveStatus == false);
57 
58  cout<<"COMMUNICATION -- received request"<<endl;
59  switch(request->messageType)
60  {
61  case REQUEST_CONNECTION:
62  sendStatus = eventManager->Send((long)fManager->fConnectionStatus,socket);
63  break;
64  case REQUEST_RECEIVING:
65  sendStatus = eventManager->Send((long)fManager->fReceivingStatus,socket);
66  break;
67  case REQUEST_SAVING:
68  sendStatus = eventManager->Send((long)fManager->fSavingStatus,socket);
69  break;
70  case REQUEST_CURRENT_SIZE:
71  sendStatus = eventManager->Send((long)fManager->fCurrentStorageSize,socket);
72  break;
73  case REQUEST_GET_PARAMS:
74  response->maxStorageSize = fManager->fMaximumStorageSize;
75  response->maxOccupation = fManager->fStorageOccupationLevel;
76  response->removeEvents = fManager->fRemoveEventsPercentage;
77  response->eventsInChunk = fManager->fNumberOfEventsInFile;
78 
79  sendStatus = eventManager->Send(response,socket);
80  break;
81  case REQUEST_SET_PARAMS:
82  SetStorageParams(request->maxStorageSize,
83  request->maxOccupation,
84  request->removeEvents,
85  request->eventsInChunk);
86 
87  fManager->fMaximumStorageSize = request->maxStorageSize;
88  fManager->fStorageOccupationLevel = request->maxOccupation;
89  fManager->fRemoveEventsPercentage = request->removeEvents;
90  fManager->fNumberOfEventsInFile = request->eventsInChunk;
91 
92  sendStatus = eventManager->Send(true,socket);
93  break;
94  default:
95  cout<<"COMMUNICATION -- unknown request"<<endl;
96  sendStatus = false;
97  break;
98  }
99  if(sendStatus == false)
100  {
101  eventManager->RecreateSocket(socket);// if couldn't send, recreate socket to be able to receive messages (currently socket is in SEND state)
102  }
103 
104  delete request;
105 
106  }
107 }
108 
109 void AliCommunicationThread::SetStorageParams(int maxStorageSize,int maxOccupation,int removeEvents,int eventsInChunk)
110 {
111  cout<<maxStorageSize<<endl<<maxOccupation<<endl<<removeEvents<<endl<<eventsInChunk<<endl;
112 
113  TThread::Lock();
114  ifstream configFile (GetConfigFilePath());
115  ofstream tmpFile("tmpFile.bla");
116 
117  if (configFile.is_open())
118  {
119  string line;
120  string tmpLine;
121  int from,to;
122  while(configFile.good())
123  {
124  getline(configFile,line);
125  from = line.find("\"")+1;
126  to = line.find_last_of("\"");
127  tmpLine = line;
128  if(line.find("MAX_SIZE=")==0){
129  tmpLine = Form("MAX_SIZE=\"%d\"",maxStorageSize);
130  }
131  else if(line.find("MAX_OCCUPATION=")==0){
132  tmpLine = Form("MAX_OCCUPATION=\"%d\"",maxOccupation);
133  }
134  else if(line.find("REMOVE_PERCENT=")==0){
135  tmpLine = Form("REMOVE_PERCENT=\"%d\"",removeEvents);
136  }
137  else if(line.find("EVENTS_IN_FILE=")==0){
138  tmpLine = Form("EVENTS_IN_FILE=\"%d\"",eventsInChunk);
139  }
140  tmpLine += "\n";
141  tmpFile << tmpLine;
142  }
143  if(configFile.eof()){configFile.clear();}
144  configFile.close();
145  tmpFile.close();
146  rename("tmpFile.bla",GetConfigFilePath());
147  }
148  else{cout<<"CLIENT -- Unable to open config file"<<endl;}
149  TThread::UnLock();
150 }
static AliZMQManager * GetInstance()
ZMQ communication manager.
Definition: AliZMQManager.h:57
AliCommunicationThread(AliStorageClientThread *onlineReconstructionManager)
void CreateSocket(storageSockets socket)
bool Send(std::vector< serverListStruct > list, storageSockets socket)
AliStorageClientThread * fManager
void SetStorageParams(int maxStorageSize, int maxOccupation, int removeEvents, int eventsInChunk)
static void * Dispatch(void *arg)
void RecreateSocket(storageSockets socket)
bool Get(std::vector< serverListStruct > *&result, storageSockets socket)