AliRoot Core  v5-06-30 (35d6c57)
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
AliEventsCollectorThread.cxx
Go to the documentation of this file.
2 #include "AliZMQManager.h"
3 
4 #include <TSystemDirectory.h>
5 
6 #include <iostream>
7 #include <fstream>
8 
9 using namespace std;
10 
12 fManager(onlineReconstructionManager),
13 fCollectorThread(0),
14 fCurrentFile(0),
15 fDatabase(0),
16 fFinished(false)
17 {
19 
21 
22  // start collecting events in a thread
23  fCollectorThread = new TThread("fCollectorThread",Dispatch,(void*)this);
24  fCollectorThread->Run();
25 }
26 
27 
29 {
31 
32  if(fCurrentFile){
33  fCurrentFile->Close();
34  delete fCurrentFile;
35  }
36  if(fDatabase){delete fDatabase;}
37  if(fManager){delete fManager;}
38 }
39 
41 {
42  if(fCollectorThread){
43  fFinished=true;
44  fCollectorThread->Join();
45  fCollectorThread->Kill();
46  }
47 }
48 
50 {
51  AliZMQManager *eventManager = AliZMQManager::GetInstance();
52  eventManager->CreateSocket(EVENTS_SERVER_SUB);
53 
54  int chunkNumber=0;
55  int previousChunkNumber=-1;
56  int eventsInChunk=0;
57  int previousRunNumber=-1;
58  AliESDEvent *event = NULL;
59  vector<struct eventStruct> eventsToUpdate;
60  struct eventStruct currentEvent;
61 
62  bool receiveStatus = false;
63 
64  while(!fFinished)
65  {
66  cout<<"CLIENT -- waiting for event..."<<endl;
67  receiveStatus = eventManager->Get(event,EVENTS_SERVER_SUB);
68 
69  if(event && receiveStatus)
70  {
71  cout<<"CLIENT -- received event"<<endl;
72  fManager->fReceivingStatus=STATUS_OK;
73 
74  if(event->GetRunNumber() != previousRunNumber)//when new run starts
75  {
76  cout<<"CLIENT -- new run started"<<endl;
77  previousRunNumber = event->GetRunNumber();
78  gSystem->Exec(Form("mkdir -p %s/run%d",fManager->fStoragePath.c_str(),event->GetRunNumber()));
79  chunkNumber=0;
80  eventsInChunk=0;
81 
82  TSystemDirectory dir(Form("%s/run%d",fManager->fStoragePath.c_str(),event->GetRunNumber()),
83  Form("%s/run%d",fManager->fStoragePath.c_str(),event->GetRunNumber()));
84  TList *files = dir.GetListOfFiles();
85  if (files)
86  {
87  TSystemFile *file;
88  string fname;
89  TIter next(files);
90 
91  while ((file=(TSystemFile*)next()))
92  {
93  fname = file->GetName();
94 
95  if (!file->IsDirectory())
96  {
97  int from = fname.find("chunk")+5;
98  int to = fname.find(".root");
99 
100  int maxChunkNumber = atoi(fname.substr(from,to-from).c_str());
101 
102  if(maxChunkNumber > chunkNumber)
103  {
104  chunkNumber = maxChunkNumber;
105  }
106  }
107  }
108  chunkNumber++;
109  }
110  }
111 
112  cout<<"CLIENT -- Received data. Event:"<<event->GetEventNumberInFile()<<"\trun:"<<event->GetRunNumber()<<endl;
113 
114  if(chunkNumber != previousChunkNumber)//when new chunk needs to be created
115  {
116  if(fCurrentFile)
117  {
118  fCurrentFile->Close();
119  delete fCurrentFile;
120  fCurrentFile=0;
121  }
122  for(unsigned int i=0;i<eventsToUpdate.size();i++)
123  {
124  TThread::Lock();
125  fDatabase->UpdateEventPath(eventsToUpdate[i],Form("%s/run%d/chunk%d.root",
126  fManager->fStoragePath.c_str(),
127  event->GetRunNumber(),
128  chunkNumber-1));
129  TThread::UnLock();
130  }
131  eventsToUpdate.clear();
132 
134 
135  fCurrentFile = new TFile(Form("%s/run%d/chunk%d.root", fManager->fStoragePath.c_str(),event->GetRunNumber(),chunkNumber),"recreate");
136 
137  previousChunkNumber = chunkNumber;
138  }
139 
140  //create new directory for this run
141  TDirectory *currentRun;
142  if((currentRun = fCurrentFile->mkdir(Form("run%d",event->GetRunNumber()))))
143  {
144  cout<<"CLIENT -- creating new directory for this run"<<endl;
145  currentRun->cd();
146  }
147  else
148  {
149  cout<<"CLIENT -- opening existing directory for this run"<<endl;
150  fCurrentFile->cd(Form("run%d",event->GetRunNumber()));
151  }
152 
153  if(0 != event->Write(Form("event%d",event->GetEventNumberInFile())))
154  //fCurrentFile->WriteObject(event,Form("event%d",event->GetEventNumberInFile())))//if event was written to file
155  {
156  eventsInChunk++;
157 
158  if(eventsInChunk == fManager->fNumberOfEventsInFile)//if max events number in file was reached
159  {
160  chunkNumber++;
161  eventsInChunk=0;
162  }
163 
164  if(fManager->fSavingStatus!=STATUS_OK){fManager->fSavingStatus=STATUS_OK;}
165  }
166  else if(fManager->fSavingStatus!=STATUS_ERROR){fManager->fSavingStatus=STATUS_ERROR;}
167 
168  // save to event file as well:
169  TFile *eventFile = new TFile(Form("%s/run%d/event%d.root", fManager->fStoragePath.c_str(),event->GetRunNumber(),eventsInChunk),"recreate");
170 
171  if((currentRun = eventFile->mkdir(Form("run%d",event->GetRunNumber()))))
172  {
173  cout<<"CLIENT -- creating new directory for this run"<<endl;
174  currentRun->cd();
175  }
176  else
177  {
178  cout<<"CLIENT -- opening existing directory for this run"<<endl;
179  eventFile->cd(Form("run%d",event->GetRunNumber()));
180  }
181 
182  if(0 == event->Write(Form("event%d",event->GetEventNumberInFile())) &&
183  fManager->fSavingStatus!=STATUS_ERROR)
184  {
185  fManager->fSavingStatus=STATUS_ERROR;
186  }
187  else
188  {
189  eventFile->Close();
190  delete eventFile;
191  TThread::Lock();
192 
193  cout<<"COLLECTOR -- events mask:"<<event->GetTriggerMask()<<endl;
194  cout<<"COLLECTOR -- events mask next 50:"<<event->GetTriggerMaskNext50()<<endl;
195 
196  fDatabase->InsertEvent(event->GetRunNumber(),
197  event->GetEventNumberInFile(),
198  (char*)event->GetBeamType(),
199  event->GetMultiplicity()->GetNumberOfTracklets(),
200  Form("%s/run%d/event%d.root",fManager->fStoragePath.c_str(),
201  event->GetRunNumber(),
202  eventsInChunk),
203  event->GetTriggerMask(),
204  event->GetTriggerMaskNext50()
205  );
206  TThread::UnLock();
207  currentEvent.runNumber = event->GetRunNumber();
208  currentEvent.eventNumber = event->GetEventNumberInFile();
209  eventsToUpdate.push_back(currentEvent);
210  }
211  delete event;event=0;
212  }
213  else
214  {
215  cout<<"CLIENT -- ERROR -- NO DATA!"<<endl;
216  if(fManager->fReceivingStatus!=STATUS_ERROR){fManager->fReceivingStatus=STATUS_ERROR;}
217  }
218  }
219  if(event){delete event;}
220 }
221 
222 
224 {
225  Long64_t totalStorageSize = 0;
226 
227  TSystemDirectory dir(fManager->fStoragePath.c_str(),fManager->fStoragePath.c_str());
228  TList *listOfDirectories = dir.GetListOfFiles();
229 
230  if (!listOfDirectories){
231  cout<<"CLIENT -- Storage directory is empty"<<endl;
232  return 0;
233  }
234  TIter nextDirectory(listOfDirectories);
235  TSystemFile *runDirectory;
236  string directoryName;
237 
238  while ((runDirectory=(TSystemFile*)nextDirectory()))
239  {
240  directoryName=runDirectory->GetName();
241  if (runDirectory->IsDirectory() && directoryName.find("run")==0)
242  {
243  TSystemDirectory dirChunks(Form("%s/%s",fManager->fStoragePath.c_str(),directoryName.c_str()),Form("%s/%s",fManager->fStoragePath.c_str(),directoryName.c_str()));
244  TList *listOfChunks = dirChunks.GetListOfFiles();
245 
246  if(listOfChunks)
247  {
248  TIter nextChunk(listOfChunks);
249  TSystemFile *chunk;
250  string chunkFileName;
251 
252  while((chunk=(TSystemFile*)nextChunk()))
253  {
254  chunkFileName = chunk->GetName();
255  if(!chunk->IsDirectory() && chunkFileName.find("chunk")==0)
256  {
257  TFile *tmpFile = new TFile(Form("%s/%s/%s",fManager->fStoragePath.c_str(),directoryName.c_str(),chunkFileName.c_str()),"read");
258  if(tmpFile)
259  {
260  totalStorageSize+=tmpFile->GetSize();
261  tmpFile->Close();
262  delete tmpFile;
263  }
264  }
265  }
266  if(chunk){delete chunk;}
267  }
268  if(listOfChunks){delete listOfChunks;}
269  }
270  }
271 
272  if(listOfDirectories){delete listOfDirectories;}
273  if(runDirectory){delete runDirectory;}
274 
275  printf("CLIENT -- Total storage size:%lld\t(%.2f MB)\n",totalStorageSize,(float)totalStorageSize/(1000.*1000.));
276 
277  return totalStorageSize;
278 }
279 
281 {
283 
285  {
287  {
288  TThread::Lock();
289  struct eventStruct oldestEvent = fDatabase->GetOldestEvent();
290  string oldestEventPath = fDatabase->GetFilePath(oldestEvent);
291  TThread::UnLock();
292  //remove oldest event
293  cout<<"CLIENT -- Removing old events:"<<oldestEventPath<<endl;
294  gSystem->Exec(Form("rm -f %s",oldestEventPath.c_str()));
295  TThread::Lock();
296  fDatabase->RemoveEventsWithPath(oldestEventPath);
297  TThread::UnLock();
298  }
299  }
300 }
301 
302 
303 
printf("Chi2/npoints = %f\n", TMath::Sqrt(chi2/npoints))
AliStorageClientThread * fManager
static AliZMQManager * GetInstance()
void InsertEvent(int runNumber, int eventNumber, char *system, int multiplicity, char *filePath, ULong64_t triggerMask, ULong64_t triggerMaskNext50)
AliEventsCollectorThread(AliStorageClientThread *onlineReconstructionManager)
ZMQ communication manager.
Definition: AliZMQManager.h:57
void CreateSocket(storageSockets socket)
static void * Dispatch(void *arg)
struct eventStruct GetOldestEvent()
bool UpdateEventPath(struct eventStruct event, const char *newPath)
std::string GetFilePath(struct eventStruct event)
char * fname
void RemoveEventsWithPath(std::string path)
bool Get(std::vector< serverListStruct > *&result, storageSockets socket)