AliPhysics  85eb05e (85eb05e)
GridWatch.C
Go to the documentation of this file.
1 
12 #ifndef __CINT__
13 # include <TString.h>
14 # include <TGrid.h>
15 # include <TSystem.h>
16 # include <TObjArray.h>
17 # include <iostream>
18 # include <fstream>
19 # include <TError.h>
20 # include <TDatime.h>
21 # include <TEnv.h>
22 #else
23 class TString;
24 #endif
25 #include <TArrayI.h>
26 
38  const TString& ext,
39  Bool_t merge=false)
40 {
41  return TString::Format("%s%s.%s", name.Data(),
42  (merge ? "_merge" : ""), ext.Data());
43 }
55  const TString& ext,
56  Bool_t merge=false)
57 {
58  // TSystem::AccessPathName return false if file is there
59  return !gSystem->AccessPathName(CacheFileName(name, ext, merge));
60 }
69 void RemoveCacheFile(const TString& name,
70  const TString& ext,
71  Bool_t merge=false)
72 {
73  gSystem->Unlink(CacheFileName(name, ext, merge));
74 }
75 
88  const TString& ext,
89  bool merge=false)
90 {
91  TString fn = TString::Format("%s%s.%s", name.Data(),
92  (merge ? "_merge" : ""), ext.Data());
93  std::ifstream in(fn.Data());
94  if (!in) {
95  Error("ReadCacheFile", "Failed to open %s", fn.Data());
96  return 0;
97  }
98  TString ln;
99  ln.ReadLine(in);
100  in.close();
101 
102  if (ln.IsNull()) return 0;
103  return ln.Tokenize(" \t");
104 }
105 
116 TObjArray* ReadJobIDs(const TString& name, bool merge=false)
117 {
118  return ReadCacheFile(name, "jobid", merge);
119 }
120 
131 TObjArray* ReadStages(const TString& name, bool merge=false)
132 {
133  return ReadCacheFile(name, "stage", merge);
134 }
135 
146 Bool_t ParseJobIDs(const TObjArray* jobIds, TArrayI& ret)
147 {
148  if (!jobIds) return false;
149 
150  Int_t n = jobIds->GetEntries();
151  ret.Set(n);
152  ret.Reset(-1);
153 
154  for (Int_t i = 0; i < n; i++) {
155  TObjString* id = static_cast<TObjString*>(jobIds->At(i));
156  const TString& s = id->String();
157  ret.SetAt(s.Atoi(), i);
158  }
159  return true;
160 }
161 
172 Bool_t ParseState(const TString& status, TString& out)
173 {
174  switch (status[0]) {
175  case 'D': out = "DONE"; break;
176  case 'E': out = "ERROR"; break;
177  case 'R': out = "RUNNING"; break;
178  case 'W': out = "WAITING"; break;
179  case 'O': out = "WAITING_QUOTA"; break;
180  case 'A': out = "ASSIGNED"; break;
181  case 'S': out = "STARTED" ; break;
182  case 'I': out = "INSERTING"; break;
183  case 'K': out = "KILLED"; break;
184  default: out = "UNKNOWN"; return false;
185  }
186  if (status[1] != '\0' &&
187  (status[0] != 'O' || status[0] != 'S')) {
188  out.Append("_");
189  switch (status[1]) {
190  case 'S': out.Append(status[0] == 'E' ? "SUBMIT" : "SPLIT"); break;
191  case 'X': out.Append("EXPIRED"); break;
192  case 'A': out.Append("ASSIGNING"); break;
193  case 'E': out.Append("EXECUTING"); break;
194  case 'V':
195  if (status[0] == 'S') out = "SAVING";
196  else out.Append("VALIDATING");
197  break;
198  case 'd':
199  if (status[0] == 'I') {
200  out = "INTERACTIVE_IDLE";
201  break;
202  } // Fall through on else
203  case 'a':
204  if (status[0] == 'I') {
205  out = "INTERACTIVE_USED";
206  break;
207  } // Fall through on else
208  default: out.Append("UNKNOWN"); return false;
209  }
210  if (status[2] != '\0') {
211  switch (status[2]) {
212  case 'V': if (status[0] == 'E') out.ReplaceAll("SUBMIT", "SAVING");
213  break;
214  default: out.Append("_UNKNOWN");
215  }
216  }
217  }
218  return true;
219 }
220 
229 {
230  tmp = "gridMonitor";
231  FILE* fp = gSystem->TempFileName(tmp);
232 
233 #if 0
234  // Here, we'd ideally use TGrid::Ps but that doesn't work, so we use
235  // the shell instead.
236  gSystem->RedirectOutput(fn);
237  gGrid->Command("ps -Ax");
238  gGrid->Stdout();
239  gSystem->RedirectOutput(0);
240  gGrid->Stderr();
241  fclose(fp);
242 #else
243  fclose(fp);
244 
245  // Printf("Using gbbox ps -Ax >> %s", tmp.Data());
246  gSystem->Exec(Form("gbbox ps -Ax >> %s", tmp.Data()));
247 #endif
248  return true;
249 }
250 
262 {
263  out = "MISSING";
264 
265  TString fn;
266  GridPs(fn);
267 
268  std::ifstream in(fn.Data());
269 
270  while (!in.eof()) {
271  TString l;
272  l.ReadLine(in);
273  if (in.bad()) break;
274 
275  TObjArray* tokens = l.Tokenize(" \t");
276  if (tokens->GetEntries() < 2) break;
277 
278  //TString user = tokens->At(0)->GetName();
279  TString sjid = tokens->At(1)->GetName(); // Job ID
280  TString stat = tokens->At(2)->GetName(); // State
281  Int_t jid = sjid.Atoi();
282 
283  if (jid != jobId) continue;
284 
285  ParseState(stat, out);
286  break;
287  }
288 
289  in.close();
290  gSystem->Unlink(fn);
291 
292  return true;
293 }
294 
305 Bool_t GetJobStates(const TArrayI& jobs, TObjArray& states)
306 {
307  Int_t n = jobs.GetSize();
308  states.Expand(n);
309  for (Int_t i = 0; i < n; i++) {
310  TObjString* s = static_cast<TObjString*>(states.At(i));
311  if (!s) states.AddAt(s = new TObjString(""), i);
312  s->SetString("MISSING");
313  }
314 
315  TString fn;
316  GridPs(fn);
317 
318  std::ifstream in(fn.Data());
319 
320  while (!in.eof()) {
321  TString l;
322  l.ReadLine(in);
323  if (in.bad()) break;
324  if (l.IsNull()) continue;
325 
326  TObjArray* tokens = l.Tokenize(" \t");
327  if (tokens->GetEntries() < 3) {
328  Warning("GetJobStates", "Got too few tokens (%d): %s",
329  tokens->GetEntries(), l.Data());
330  tokens->Print();
331  break;
332  }
333 
334  //TString user = tokens->At(0)->GetName();
335  TString sjid = tokens->At(1)->GetName(); // Job ID
336  TString stat = tokens->At(2)->GetName(); // State
337  Int_t jid = sjid.Atoi();
338 
339  for (Int_t i = 0; i < n; i++) {
340  if (jid != jobs.At(i)) continue;
341  TObjString* s = static_cast<TObjString*>(states.At(i));
342  TString out;
343  if (!ParseState(stat, out)) continue;
344  s->SetString(out);
345  }
346  }
347 
348  in.close();
349  gSystem->Unlink(fn);
350 
351  return true;
352 }
353 
361 {
362  Int_t ret = gSystem->Exec("alien-token-info > /dev/null 2>&1");
363  if (ret != 0) {
364  Printf("=== AliEn token not valid");
365  return false;
366  }
367  return true;
368 }
369 
376 #if 0
377 void RefreshAlienToken(UInt_t, Bool_t f=false)
378 {}
379 #else
380 void RefreshAlienToken(UInt_t now, Bool_t force=false)
381 {
382  Bool_t renew = force;
383  if (!renew && !CheckAlienToken()) renew = true;
384 
385  if (!renew) {
386  TString l = gSystem->GetFromPipe(Form("cat /tmp/gclient_token_%d",
387  gSystem->GetUid()));
388  TObjArray* lines = l.Tokenize("\n");
389  TObjString* sline = 0;
390  UInt_t expire = 0;
391  TIter next(lines);
392  while ((sline = static_cast<TObjString*>(next()))) {
393  TString& line = sline->String();
394  if (!line.BeginsWith("Expiretime")) continue;
395 
396  Size_t eq = line.Index("=");
397  TString sdatime = line(eq+2, line.Length()-eq-2);
398  expire = sdatime.Atoi();
399  break;
400  }
401  lines->Delete();
402  // If the expiration date/time has passed or is less than 30 min
403  // away, we refresh
404  Int_t diff = (expire - now);
405  if (now > expire || diff < 30*60) renew = true;
406 
407  Printf("=== Now: %d, Expires: %d, in %03d:%02d:%02d -> %s",
408  now, expire, diff/60/60, (diff/60 % 60), (diff % 60),
409  (renew ? "renew" : "nothing"));
410 
411  }
412 
413  if (!renew) return;
414 
415  // Reset the start time
416  Printf("=== Refreshing AliEn token");
417  gSystem->Exec("alien-token-init");
418  Printf("=== Done refreshing AliEn token");
419 }
420 #endif
421 
422 
436  TObjArray* stages,
437  Int_t delay,
438  Bool_t batch)
439 {
440  if (!CheckAlienToken()) return false;
441  // Bool_t stopped = false;
442  TFileHandler h(0, 0x1);
443  // RefreshAlienToken(0, true);
444  do {
445  Bool_t allDone = true;
446  TDatime t;
447  Printf("--- %4d/%02d/%02d %02d:%02d:%02d [Press enter to pause] ---",
448  t.GetYear(), t.GetMonth(), t.GetDay(),
449  t.GetHour(), t.GetMinute(), t.GetSecond());
450  UInt_t now = t.Convert(true);
451 
452  TObjArray states;
453  GetJobStates(jobs, states);
454 
455  Int_t missing = 0;
456  Int_t total = jobs.GetSize();
457  // Bool_t allAccounted = false;
458  for (Int_t i = 0; i < total; i++) {
459  Int_t job = jobs.At(i);
460 
461  if (job < 0) continue;
462 
463  TObjString* obj = static_cast<TObjString*>(states.At(i));
464  const TString& state = obj->String();
465 
466  if (state.BeginsWith("ERROR_"))
467  jobs.SetAt(-1, i);
468  else if (state.EqualTo("MISSING"))
469  missing++;
470  else if (!state.EqualTo("DONE"))
471  allDone = false;
472 
473 
474  Printf(" %d(%s)=%s", job, stages->At(i)->GetName(), state.Data());
475 
476  }
477  RefreshAlienToken(now);
478 
479  if (allDone) break;
480  if (missing >= total) {
481  Error("GetJobStates", "Info on all jobs missing");
482  break;
483  }
484  if (!batch) {
485  if (gSystem->Select(&h, 1000*delay)) {
486  // Got input on std::cin
487  std::string l;
488  std::getline(std::cin, l);
489  std::cout << "Do you want to terminate now [yN]? " << std::flush;
490  std::getline(std::cin, l);
491  if (l[0] == 'y' || l[0] == 'Y') {
492  // stopped = true;
493  break;
494  }
495  }
496  }
497  else
498  gSystem->Sleep(1000*delay);
499 
500  //
501  } while (true);
502 
503  return true;
504 }
514 void GridWatch(const TString& name, Bool_t batch=false, UShort_t delay=5*60)
515 {
516 #if 1
517  // We use command line tools instead of ROOT interface - which is
518  // broken so badly that it's hard to believe it ever worked.
519  gEnv->SetValue("XSec.GSI.DelegProxy", "2");
520  TGrid::Connect("alien:///");
521  if (!gGrid) {
522  Error("GridWatch", "Failed to connect to the Grid");
523  return;
524  }
525 #endif
526 
527  TObjArray* jobIDs = ReadJobIDs(name, false);
528  TObjArray* stages = ReadStages(name, false);
529 
530  if (!jobIDs || !stages) return;
531 
532  TArrayI jobs;
533  if (!ParseJobIDs(jobIDs, jobs)) return;
534 
535  gSystem->Sleep(10*1000);
536  if (!(CheckCacheFile(name, "jobid", true) &&
537  CheckCacheFile(name, "stage", true)))
538  if (!WaitForJobs(jobs, stages, delay, batch)) return;
539 
540  delete jobIDs;
541  delete stages;
542 
543  // return;
544  do {
545  if (!CheckCacheFile(name, "jobid", true) &&
546  !CheckCacheFile(name, "stage", true)) {
547  if (!CheckAlienToken()) return;
548  Printf("Now executing terminate");
549  gSystem->Exec("aliroot -l -b -q Terminate.C");
550  gSystem->Sleep(10*1000);
551  }
552 
553  Printf("Reading job ids");
554  jobIDs = ReadJobIDs(name, true);
555  stages = ReadStages(name, true);
556 
557  if (!ParseJobIDs(jobIDs, jobs)) {
558  Error("GridWatch", "Failed to parse job ids %s",
559  CacheFileName(name,"jobid",true).Data());
560  return;
561  }
562 
563  if (!WaitForJobs(jobs, stages, delay, batch)) return;
564 
565  Bool_t allFinal = true;
566  for (Int_t i = 0; i < jobs.GetSize(); i++) {
567  if (jobs.At(i) < 0) continue;
568 
569  const TString& s = static_cast<TObjString*>(stages->At(i))->String();
570  if (!s.BeginsWith("final_")) allFinal = false;
571  }
572 
573  delete jobIDs;
574  delete stages;
575 
576  Printf("All jobs in final stage");
577  if (allFinal) break;
578 
579  RemoveCacheFile(name, "jobid", true);
580  RemoveCacheFile(name, "stage", true);
581  } while (true);
582 
583  Printf("Finished");
584 }
585 //
586 // EOF
587 //
588 
Bool_t WaitForJobs(TArrayI &jobs, TObjArray *stages, Int_t delay, Bool_t batch)
Definition: GridWatch.C:435
Bool_t ParseState(const TString &status, TString &out)
Definition: GridWatch.C:172
void RemoveCacheFile(const TString &name, const TString &ext, Bool_t merge=false)
Definition: GridWatch.C:69
TSystem * gSystem
TObjArray * ReadJobIDs(const TString &name, bool merge=false)
Definition: GridWatch.C:116
TString CacheFileName(const TString &name, const TString &ext, Bool_t merge=false)
Definition: GridWatch.C:37
TObjArray * ReadStages(const TString &name, bool merge=false)
Definition: GridWatch.C:131
void GridWatch(const TString &name, Bool_t batch=false, UShort_t delay=5 *60)
Definition: GridWatch.C:514
int Int_t
Definition: External.C:63
Bool_t GridPs(TString &tmp)
Definition: GridWatch.C:228
unsigned int UInt_t
Definition: External.C:33
Bool_t GetJobStates(const TArrayI &jobs, TObjArray &states)
Definition: GridWatch.C:305
Bool_t Data(TH1F *h, Double_t *rangefit, Bool_t writefit, Double_t &sgn, Double_t &errsgn, Double_t &bkg, Double_t &errbkg, Double_t &sgnf, Double_t &errsgnf, Double_t &sigmafit, Int_t &status)
TObjArray * ReadCacheFile(const TString &name, const TString &ext, bool merge=false)
Definition: GridWatch.C:87
Bool_t ParseJobIDs(const TObjArray *jobIds, TArrayI &ret)
Definition: GridWatch.C:146
Bool_t CheckCacheFile(const TString &name, const TString &ext, Bool_t merge=false)
Definition: GridWatch.C:54
unsigned short UShort_t
Definition: External.C:28
Bool_t GetJobState(Int_t jobId, TString &out)
Definition: GridWatch.C:261
bool Bool_t
Definition: External.C:53
Bool_t CheckAlienToken()
Definition: GridWatch.C:360
void RefreshAlienToken(UInt_t now, Bool_t force=false)
Definition: GridWatch.C:380