AliPhysics  7baac56 (7baac56)
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ProofRailway.C
Go to the documentation of this file.
1 
12 #ifndef PROOFHELPER_C
13 #define PROOFHELPER_C
14 #include "Railway.C"
15 #ifndef __CINT__
16 # include "OutputUtilities.C"
17 # include "ParUtilities.C"
18 # include <TUrl.h>
19 # include <TString.h>
20 # include <TProof.h>
21 # include <TProofLog.h>
22 # include <TProofDebug.h>
23 # include <AliAnalysisManager.h>
24 # include <TEnv.h>
25 # include <TChain.h>
26 // For SendFile
27 # include <TSystem.h>
28 # include <TSlave.h>
29 # include <TSocket.h>
30 # include <cerrno>
31 #else
32 class TUrl;
33 class TChain;
34 #endif
35 
36 // ===================================================================
86 struct ProofRailway : public Railway
87 {
94  ProofRailway(const TUrl& url, Int_t verbose)
95  : Railway(url, verbose),
96  fExtraLibs(""),
97  fExtraPars(""),
98  fExtraSrcs(""),
99  fUsePars(false),
100  fBasePars(false),
101  fTestBuild(true),
102  fAuxFiles()
103  {
104  fOptions.Add("workers", "N[x]", "Number of workers to use", 0);
105  fOptions.Add("dsname", "NAME", "Make output dataset", "");
106  fOptions.Add("par", "tasks|all", "Use par files", "tasks");
107  fOptions.Add("mode", "default|rec|sim", "AliROOT mode", "default");
108  fOptions.Add("storage", "URL", "Location for external storage", "");
109  fOptions.Add("wrapper", "CMD", "Wrapper command", "");
110  fOptions.Add("clear", "PKGS", "Clear packages ','-separated", "");
111  fOptions.Add("reset", "soft|hard", "Reset cluster", "hard");
112  fOptions.Add("feedback", "Enable feedback mechanism");
113  fOptions.Add("env", "SCRIPT", "Script to set-up environment","-none-");
114  fOptions.Add("offset", "EVENTS", "Skip this number of events", 0);
115  fOptions.Add("testpar", "Test build PARs");
116  if (!fUrl.GetUser() || fUrl.GetUser()[0] == '\0')
117  fUrl.SetUser(gSystem->GetUserInfo()->fUser);
118  fAuxFiles.SetOwner();
119  }
121  : Railway(o),
122  fExtraLibs(""),
123  fExtraPars(""),
124  fExtraSrcs(""),
125  fUsePars(false),
126  fBasePars(false),
127  fTestBuild(true),
128  fAuxFiles()
129  {}
131  {
132  if (&o == this) return *this;
137  fUsePars = o.fUsePars;
138  fBasePars = o.fBasePars;
140  // fAuxFiles;
141  return *this;
142  }
146  virtual ~ProofRailway() {}
160  void UsePar(Bool_t& use)
161  {
162  Bool_t tmp = fUsePars;
163  fUsePars = use;
164  use = tmp;
165  }
173  virtual Bool_t AddIncludePath(const TString& path)
174  {
175  Railway::AddIncludePath(path); // Also do on client
176  TString p(path);
177  if (!p.BeginsWith("-")) {
178  // IF the path does not begin with a -, we assume its a path
179  p.Prepend("-I");
180  }
181  gProof->AddIncludePath(p);
182  return true;
183  }
193  virtual Bool_t LoadLibrary(const TString& name,
194  Bool_t slaves=true,
195  Bool_t forcePar=false)
196  {
197  Bool_t isBase = false;
198  if (!fBasePars) {
199  if (name.EqualTo("STEERBase") ||
200  name.EqualTo("ESD") ||
201  name.EqualTo("AOD") ||
202  name.EqualTo("ANALYSIS") ||
203  name.EqualTo("OADB") ||
204  name.EqualTo("ANALYSISalice"))
205  isBase = true;
206  }
207  if ((!fUsePars || isBase) && !forcePar) {
208  Int_t ret = gSystem->Load(MakeLibraryName(name));
209  if (ret < 0) return false;
210  if (slaves) fExtraLibs.Append(Form(":%s", name.Data()));
211  }
212  else {
213  if (!ParUtilities::Find(name)) {
214  Error("ProofRailway::LoadLibrary", "Failed to find PAR file %s",
215  name.Data());
216  return false;
217  }
218  if (fTestBuild && !ParUtilities::Build(name)) {
219  Error("ProofRailway::LoadLibrary", "Failed to build PAR file %s",
220  name.Data());
221  return false;
222  }
223  if (gProof->UploadPackage(name.Data(), TProof::kRemoveOld) < 0) {
224  Error("ProofRailway::LoadLibrary", "Failed to upload PAR file %s",
225  name.Data());
226  return false;
227  }
228  fExtraPars.Append(Form(":%s", name.Data()));
229  }
230  return true;
231  }
240  virtual Bool_t LoadSource(const TString& name, bool copy=true)
241  {
242  if (!Railway::LoadSource(name, copy)) return false;
243  fExtraSrcs.Append(Form(":%s", gSystem->BaseName(name.Data())));
244  return true;
245  }
251  virtual Bool_t LoadAliROOT()
252  {
253  if (!gSystem->Getenv("ALICE_ROOT")) {
254  Error("ProofRailway::LoadAliROOT", "Local AliROOT not available");
255  return false;
256  }
257 
258  Bool_t tmp = fUsePars;
260  if (!LoadLibrary("STEERBase")) return false;
261  if (!LoadLibrary("ESD")) return false;
262  if (!LoadLibrary("AOD")) return false;
263  if (!LoadLibrary("ANALYSIS")) return false;
264  if (!LoadLibrary("ANALYSISalice")) return false;
265  fUsePars = tmp;
266 
267  return CreateAliROOTPar();
268  }
275  {
276  if (!gSystem->Getenv("ALICE_PHYSICS")) {
277  Error("ProofRailway::LoadAliPhysics",
278  "Local AliPhysics not available");
279  return false;
280  }
281 
282  Bool_t tmp = fUsePars;
284  if (!LoadLibrary("OADB")) return false;
285  fUsePars = tmp;
286 
287  return CreateAliPhysicsPar();
288  }
294  virtual const char* AliROOTParName() const
295  {
296  return "ALIROOT";
297  }
303  virtual const char* AliPhysicsParName() const
304  {
305  return "ALIPHYSICS";
306  }
329  virtual Bool_t CreatePseudoPar(const TString& parName,
330  const TString& env,
331  const TString& setup)
332 
333  {
334  if (fBasePars) return true;
335 
336  TString parFile(Form("%s.par", parName.Data()));
337 
338  // --- Check if we have the drirectory already -------------------
339  if (gSystem->AccessPathName(parName.Data()) == 0) {
340  // Let's remove it to get a clean slate
341  if (gSystem->Exec(Form("rm -rf %s", parName.Data())) != 0) {
342  Error("ProofRailway", "Failed to remove %s", parName.Data());
343  return false;
344  }
345  }
346  // --- Check if the PAR file is there, and remove it if so -------
347  if (gSystem->AccessPathName(parFile.Data()) == 0) {
348  if (gSystem->Unlink(parFile.Data()) != 0) {
349  Error("ProofRailway::CreatePseudoPar", "Failed to remove %s",
350  parFile.Data());
351  return false;
352  }
353  }
354 
355 
356  // Set-up directories
357  if (gSystem->MakeDirectory(parName) < 0) {
358  Error("ProofRailway::CreatePseudoPar", "Could not make directory '%s'",
359  parName.Data());
360  return false;
361  }
362 
363  if (gSystem->MakeDirectory(Form("%s/PROOF-INF", parName.Data()))) {
364  Error("ProofRailway::CreatePseudoPar",
365  "Could not make directory %s/PROOF-INF",
366  parName.Data());
367  return false;
368  }
369 
370  // --- Possible environment script -------------------------------
371  TString envScript = env;
372  if (!envScript.EndsWith(".C") && !envScript.EndsWith(".sh"))
373  envScript = "";
374  Printf("Environment script: %s", envScript.Data());
375 
376  // --- Build script does nothing ---------------------------------
377  std::ofstream b(Form("%s/PROOF-INF/BUILD.sh",parName.Data()));
378  if (!b) {
379  Error("ProofRailway::CreatePseudoPar",
380  "Failed to make BUILD.sh shell script");
381  return false;
382  }
383  b << "#!/bin/sh\n\n";
384  if (envScript.EndsWith(".sh"))
385  b << ". PROOF-INF/" << gSystem->BaseName(envScript.Data()) << "\n";
386  else
387  b << "# echo Nothing to do\n";
388  b << "exit 0\n" << std::endl;
389  b.close();
390  gSystem->Exec(Form("chmod a+x %s/PROOF-INF/BUILD.sh",parName.Data()));
391 
392  if (!envScript.IsNull()) {
393  // If an environment script was specified, copy that to the par
394  if (gSystem->AccessPathName(envScript.Data()) == 0) {
395  // Copy script
396  if (gSystem->Exec(Form("cp %s %s/PROOF-INF/", envScript.Data(),
397  parName.Data())) != 0) {
398  Error("ProofRailway", "Failed to copy %s", envScript.Data());
399  return false;
400  }
401  }
402  else {
403  Warning("CreateALIROOTPar", "Couldn't read %s", envScript.Data());
404  envScript = "";
405  }
406  }
407  // --- Write SETUP script ----------------------------------------
408  std::ofstream s(Form("%s/PROOF-INF/SETUP.C", parName.Data()));
409  if (!s) {
410  Error("ProofRailway::CreatePseudoPar",
411  "Failed to make SETUP.C ROOT script");
412  return false;
413  }
414  s << "Bool_t Load(const char* lib) {"
415  << " TString ln(lib);"
416  << " if (!ln.BeginsWith(\"lib\")) ln.Prepend(\"lib\");\n"
417  << " if (!ln.EndsWith(\".so\")) ln.Append(\".so\");\n"
418  << " TString w = gSystem->Which(gSystem->GetDynamicPath(),ln);\n"
419  << " if (w.IsNull()) { \n"
420  << " Warning(\"Load\",\"%s not found\",ln.Data());"
421  << " return false; }\n"
422  << " Info(\"Load\",\"Trying to load %s\",ln.Data());\n"
423  << " if (gSystem->Load(ln) < 0) return false;\n"
424  << " return true;\n"
425  << "}\n"
426  << std::endl;
427 
428  s << "void SETUP(TList* opts) {\n";
429  if (envScript.IsNull()) {
430  s << env << std::endl;
431  }
432  else if (envScript.EndsWith(".C")) {
433  s << " gROOT->Macro(\"PROOF-INF/" << gSystem->BaseName(envScript.Data())
434  << "\");\n";
435  }
436  s << setup << "}\n"
437  << std::endl;
438  s.close();
439 
440  // --- TAR up everything -----------------------------------------
441  Int_t ret = gSystem->Exec(Form("tar -czf %s %s",
442  parFile.Data(), parName.Data()));
443  if (ret != 0) {
444  Error("ProofRailway::CreatePseudoPar", "Failed to pack up PAR file %s",
445  parFile.Data());
446  return false;
447  }
448 
449  // --- And upload the package ------------------------------------
450  ret = gProof->UploadPackage(parFile.Data(),TProof::kRemoveOld);
451  if (ret != 0) {
452  Error("ProofRailway::CreatePseudoPar",
453  "Failed to upload the AliROOT PAR file");
454  return false;
455  }
456  // Note, the PAR isn't enabled until much later when we've
457  // collected all the needed libraries in fExtraLibs
458  return true;
459  }
460  static void ExportEnvVar(TString& out, const TString& name)
461  {
462  TString env(gSystem->Getenv(name.Data()));
463  if (env.IsNull()) return;
464 
465  out.Append(Form(" gSystem->Setenv(\"%s\",\"%s\");\n",
466  name.Data(), env.Data()));
467  }
468 
492  {
493  TString parName(AliROOTParName());
494  TString envScript = fOptions.Get("env");
495  if (envScript.EqualTo("-none-", TString::kIgnoreCase) ||
496  !(envScript.EndsWith(".C") || envScript.EndsWith(".sh")))
497  envScript = "";
498 
499  Printf("Environment script: %s", envScript.Data());
500 
501  TString env = "";
502  ExportEnvVar(env, "ALICE");
503  ExportEnvVar(env, "ALICE_ROOT");
504 
505  TString setup(" gSystem->AddDynamicPath(\"${ALICE_ROOT}/lib\");\n"
506  " gSystem->AddIncludePath(\"-I${ALICE_ROOT}/include\");\n"
507  " Info(\"SETUP\",\"Loading ROOT libraries\");\n"
508  " Load(\"libTree\");\n"
509  " Load(\"libGeom\");\n"
510  " Load(\"libVMC\");\n"
511  " Load(\"libPhysics\");\n"
512  " Load(\"libMinuit\");\n"
513  " \n"
514  " Info(\"SETUP\",\"Parameter list:\");\n"
515  " if (!opts) return;\n"
516  " //opts->ls();\n"
517  "\n"
518  " TObject* par = opts->FindObject(\"ALIROOT_MODE\");\n"
519  " if (par) {\n"
520  " Info(\"SETUP\",\"ALIROOT mode: %s\",par->GetTitle());\n"
521  " TString mode(par->GetTitle());mode.ToLower();\n"
522  " if (mode.EqualTo(\"default\")) {\n"
523  " Load(\"libSTEERBase\");\n"
524  " Load(\"libESD\");\n"
525  " Load(\"libAOD\");\n"
526  " Load(\"libANALYSIS\");\n"
527  " Load(\"libANALYSISalice\");\n"
528  " }\n"
529  " else if (mode.EqualTo(\"aliroot\")) \n"
530  " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibs.C\");\n"
531  " else if (mode.EqualTo(\"rec\")) \n"
532  " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibsrec.C\");\n"
533  " else if (mode.EqualTo(\"sim\")) \n"
534  " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibssim.C\");\n"
535  " else if (mode.EqualTo(\"train\")) \n"
536  " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibstrain.C\");\n"
537  " else if (mode.EqualTo(\"custom\")) \n"
538  " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibstrain.C\");\n"
539  "}\n"
540  "\n"
541  "par = opts->FindObject(\"ALIROOT_EXTRA_LIBS\");\n"
542  "if (par) {\n"
543  " Info(\"SETUP\",\"Libaries to load: %s\\n\",\n"
544  " par->GetTitle());\n"
545  " TString tit(par->GetTitle());\n"
546  " TObjArray* tokens = tit.Tokenize(\":\");\n"
547  " TObject* lib = 0;\n"
548  " TIter next(tokens);\n"
549  " while ((lib = next())) {\n"
550  " TString libName(lib->GetName());\n"
551  " if (!libName.BeginsWith(\"lib\"))\n"
552  " libName.Prepend(\"lib\");\n"
553  " // Info(\"SETUP\",\"Loading %s ...\",libName.Data());\n"
554  " Load(libName.Data());\n"
555  " }\n"
556  "}");
557  // Note, the PAR isn't enabled until much later when we've
558  // collected all the needed libraries in fExtraLibs
559  return CreatePseudoPar(parName,envScript.IsNull() ? env : envScript,setup);
560  }
562  {
563  TString parName(AliPhysicsParName());
564  TString env = "";
565  ExportEnvVar(env, "ALICE_PHYSICS");
566  ExportEnvVar(env, "OADB_PATH");
567 
568  TString setup("gSystem->AddDynamicPath(\"$(ALICE_PHYSICS)/lib\");\n"
569  "gSystem->AddIncludePath(\"-I${ALICE_PHYSICS}/include\");\n"
570  "// Info(\"SETUP\",\"Parameter list:\");\n"
571  "if (!opts) return;\n"
572  "//opts->ls();\n"
573  "\n"
574  "TObject* par = opts->FindObject(\"ALIPHYSICS_MODE\");\n"
575  "if (par) {\n"
576  " //Info(\"SETUP\",\"ALIPHYSICS mode:%s\",par->GetTitle());\n"
577  " TString mode(par->GetTitle());mode.ToLower();\n"
578  " if (mode.EqualTo(\"default\")) {\n"
579  " gSystem->Load(\"libOADB\");\n"
580  " }\n"
581  "}\n"
582  "\n"
583  "par = opts->FindObject(\"ALIPHYSICS_EXTRA_LIBS\");\n"
584  "if (par) {\n"
585  " Info(\"SETUP\",\"Libaries to load: %s\\n\",\n"
586  " par->GetTitle());\n"
587  " TString tit(par->GetTitle());\n"
588  " TObjArray* tokens = tit.Tokenize(\":\");\n"
589  " TObject* lib = 0;\n"
590  " TIter next(tokens);\n"
591  " while ((lib = next())) {\n"
592  " TString libName(lib->GetName());\n"
593  " if (!libName.BeginsWith(\"lib\"))\n"
594  " libName.Prepend(\"lib\");\n"
595  " // Info(\"SETUP\",\"Loading %s ...\",libName.Data());\n"
596  " gSystem->Load(libName.Data());\n"
597  " }\n"
598  "}");
599  // Note, the PAR isn't enabled until much later when we've
600  // collected all the needed libraries in fExtraLibs
601  return CreatePseudoPar(parName,env,setup);
602  }
608  virtual UShort_t Mode() const { return kProof; }
612  virtual const char* ModeString() const { return "proof"; }
621  virtual Bool_t Connect(const TUrl& url, const TString& opts)
622  {
623  Info("ProofRailway::Connect", "Connecting to %s with %soptions %s",
624  url.GetUrl(),
625  opts.IsNull() ? "no " : "",
626  opts.Data());
627  TProof::Open(url.GetUrl(), opts);
628  // TProof::Open(connect.GetHost(), opts);
629  if (!gProof) {
630  Error("ProofRailway::Connect", "Failed to open Proof connection %s",
631  url.GetUrl());
632  return false;
633  }
634  return true;
635  }
641  virtual Bool_t PreSetup()
642  {
643  // --- Set prefered GSI method ---------------------------------
644  gEnv->SetValue("XSec.GSI.DelegProxy", "2");
645 
646  // --- Add ALICE_ROOT directory to search path for packages ----
647  // Info("ProofRailway::PreSetup", "Set location of packages");
648  gEnv->SetValue("Proof.GlobalPackageDirs",
649  Form("%s:%s:%s",
650  gEnv->GetValue("Proof.GlobalPackageDirs", "."),
651  gSystem->Getenv("ALICE_PHYSICS"),
652  gSystem->Getenv("ALICE_ROOT")));
653 
654  // --- Forming the URI we use to connect with --------------------
655  TUrl connect(fUrl);
656  connect.SetAnchor("");
657  connect.SetFile("");
658  connect.SetOptions("");
659 
660  // --- Check if we need to reset first ---------------------------
661  if (fOptions.Has("reset")) {
662  TString reset = fOptions.Get("reset");
663  Bool_t hard = (reset.IsNull() ||
664  reset.EqualTo("hard", TString::kIgnoreCase));
665  Info("ProofRailway::PreSetup", "Doing a %s reset of %s",
666  hard ? "hard" : "soft", connect.GetUrl());
667  TProof::Reset(connect.GetUrl(), hard);
668  Int_t secs = 3;
669  Info("ProofRailway::PreSetup",
670  "Waiting for %d second%s for things to settle", secs,
671  secs > 1 ? "s" : "");
672  gSystem->Sleep(1000*secs);
673  }
674 
675  // --- Check if we're using a wrapper ----------------------------
676  if (fOptions.Has("wrapper")) {
677  TString wrapper = fOptions.Get("wrapper");
678  if (wrapper.IsNull())
679  // In case of no argument, use GDB
680  // Just run and backtrace
681  wrapper = "/usr/bin/gdb --batch -ex run -ex bt --args";
682  Info("ProofRailway::PreSetup", "Using wrapper command: %s",
683  wrapper.Data());
684  TProof::AddEnvVar("PROOF_WRAPPERCMD", wrapper);
685  }
686 
687  // --- PAR parameters --------------------------------------------
688  fUsePars = fOptions.Has("par");
689  fBasePars = (fUsePars &&
690  fOptions.Get("par").EqualTo("all",TString::kIgnoreCase));
691  fTestBuild = fOptions.Has("testpar");
692 
693  // --- Connect to the cluster ------------------------------------
694  TString opts;
695  if (fOptions.Has("workers"))
696  opts.Append(Form("workers=%s", fOptions.Get("workers").Data()));
697 
698  if (!Connect(connect, opts)) return false;
699 
700  // --- Check if we need to clear packages ------------------------
701  if (fOptions.Has("clear")) {
702  TString pkgs = fOptions.Get("clear");
703  Info("PreSetup", "Clearing (%s)", pkgs.Data());
704  if (pkgs.IsNull() || pkgs.EqualTo("all", TString::kIgnoreCase)) {
705  // No value given, clear all
706  if (gProof->ClearPackages() != 0)
707  Warning("ProofRailway::PreSetup", "Failed to clear all packages");
708  Info("PreSetup", "Clearing cache");
709  gProof->ClearCache("*");
710  }
711  else {
712  // Tokenize on ',' and clear each package
713  TObjArray* pars = pkgs.Tokenize(",");
714  TObject* pkg = 0;
715  TIter next(pars);
716  while ((pkg = next())) {
717  if (TString(pkg->GetName()).EqualTo("cache")) {
718  Info("PreSetup", "Clearing cache");
719  gProof->ClearCache();
720  continue;
721  }
722  Info("PreSetup", "Clearing package %s", pkg->GetName());
723  if (gProof->ClearPackage(pkg->GetName()) != 0)
724  Warning("ProofRailway::PreSetup", "Failed to clear package %s",
725  pkg->GetName());
726  }
727  pars->Delete();
728  }
729  }
730  return true;
731  }
740  virtual Bool_t EnableSpecial(const TString& parName,
741  const TString& prefix)
742  {
743  if (prefix.IsNull()) {
744  Warning("EnableSpecial", "No prefix specified");
745  return false;
746  }
747  const char* prf = prefix.Data();
748  if (parName.IsNull()) {
749  Warning("EnableSpecial", "No par name specified for %s", prf);
750  return false;
751  }
752  // List of parameters for PAR x
753  TList* params = new TList;
754  params->SetOwner(true);
755 
756  // Extra libraries
757  TString tmp(fExtraLibs.Strip(TString::kBoth,':'));
758  params->Add(new TNamed(Form("%s_EXTRA_LIBS", prf), tmp.Data()));
759 
760  // Check for mode
761  if (fOptions.Has("mode"))
762  params->Add(new TNamed(Form("%s_MODE", prf),
763  fOptions.Get("mode").Data()));
764  else
765  params->Add(new TNamed(Form("%s_MODE", prf), "default"));
766 
767  // Check for AliEn
768  if (fOptions.Has("alien"))
769  params->Add(new TNamed(Form("%s_ENABLE_ALIEN", prf), "1"));
770 
771  // Try to enable the package - do we need to load first?
772  Int_t ret = gProof->EnablePackage(parName.Data(), params, true);
773  if (ret < 0) {
774  Error("ProofRailway::EnableSpecial", "Failed to enable %s PAR %s",
775  prf, parName.Data());
776  return false;
777  }
778  return true;
779  }
780 
787  {
788  return EnableSpecial(AliROOTParName(), "ALIROOT");
789  }
796  {
797  return EnableSpecial(AliPhysicsParName(), "ALIPHYSICS");
798  }
804  virtual Bool_t PostSetup()
805  {
806  AliAnalysisManager* mgr = AliAnalysisManager::GetAnalysisManager();
807  if (!mgr) {
808  Error("ProofRailway::PostSetup", "No analysis manager defined");
809  return false;
810  }
811 
812  // --- Check for output ------------------------------------------
813  if (fOptions.Has("dsname"))
815  if (fOptions.Has("storage"))
817 
818  // --- Check for feedback mechanism ------------------------------
819  if (!fOptions.Has("feedback"))
820  gProof->ClearFeedback();
821 
822  // --- If we are not using PARs for Base, enable special PAR -----
823  if (!fBasePars) {
824  if (!EnableAliROOT()) return false;
825  if (!EnableAliPhysics()) return false;
826  }
827 
828  // --- Make PAR file of Aux Files --------------------------------
829  if (fAuxFiles.GetEntries() > 0) {
830  TString name = TString::Format("%s_auxfiles", mgr->GetName());
832 
833  if (gProof->UploadPackage(name.Data(), TProof::kRemoveOld) < 0)
834  Error("ProofRailway::PostSetup", "Failed to upload PAR file %s",
835  name.Data());
836  else
837  fExtraPars.Append(Form(":%s", name.Data()));
838  }
839 
840  // --- Load par files --------------------------------------------
841  TString tmp = fExtraPars.Strip(TString::kBoth,':');
842  TObjArray* pars = tmp.Tokenize(":");
843  TObject* obj = 0;
844  TIter next(pars);
845  while ((obj = next())) {
846  // Enable the package, but do not build on client - already done
847  Int_t ret = gProof->EnablePackage(obj->GetName(), true);
848  if (ret < 0) {
849  Error("ProofRailway::PostSetup", "Failed to enable PAR %s",
850  obj->GetName());
851  return false;
852  }
853  }
854 
855  // --- Load extra sources ----------------------------------------
856  return LoadExtraSrcs();
857  }
859  {
860  TString tmp2 = fExtraSrcs.Strip(TString::kBoth, ':');
861  TObjArray* srcs = tmp2.Tokenize(":");
862  TIter next2(srcs);
863  TObject* obj = 0;
864  while ((obj = next2())) {
865  Int_t ret = gProof->Load(Form("%s+g", obj->GetName()), true);
866  if (ret < 0) {
867  Error("ProofRailway::PostSetup", "Failed to compile %s",obj->GetName());
868  return false;
869  }
870  }
871  return true;
872  }
878  virtual void GetDataSet(TString& dsname)
879  {
880  dsname = fUrl.GetFile();
881  }
890  {
891  AliAnalysisManager* mgr = AliAnalysisManager::GetAnalysisManager();
892  gProof->SetLogLevel(TMath::Max(fVerbose-2,0),
893  /* TProofDebug::kPacketizer| */
894  TProofDebug::kLoop|
895  /* TProofDebug::kSelector|
896  TProofDebug::kOutput|
897  TProofDebug::kInput|
898  TProofDebug::kGlobal|*/
899  TProofDebug::kPackage);
900  TString dsName;
901  GetDataSet(dsName);
902  // if (fUrl.GetAnchor() && fUrl.GetAnchor()[0] != '\0')
903  // dsName.Append(Form("#%s", fUrl.GetAnchor()));
904  // Info("Run", "Output objects registered with PROOF:");
905  // gProof->GetOutputList()->ls();
906  Long64_t off = fOptions.AsLong("offset", 0);
907  if (nEvents > 0 && nEvents < off) {
908  Warning("Run", "Number of events %lld < offset (%lld), stopping",
909  nEvents, off);
910  return 0;
911  }
912  Printf("Running proof analysis on data set %s", dsName.Data());
913  Long64_t ret = 0;
914  TChain* chain = LocalChain();
915  if (chain)
916  ret = mgr->StartAnalysis(fUrl.GetProtocol(), chain, nEvents, off);
917  else
918  ret = mgr->StartAnalysis(fUrl.GetProtocol(), dsName.Data(), nEvents, off);
919 
920  TString store = fOptions.Get("storage");
921  if (!store.IsNull() && store.EqualTo("auto",TString::kIgnoreCase))
923 
924  if (fVerbose > 10)
925  TProof::Mgr(fUrl.GetUrl())->GetSessionLogs()->Save("*","proof.log");
926  return ret;
927  }
928 #if 0
929  Bool_t AddMonitor(const TString& path)
930  {
931  if (path.IsNull()) return true;
932 
933  TObjArray* tokens = path.Tokenize("/");
934  Int_t nTokens = tokens->GetEntries();
935  if (nTokens < 2) {
936  Error("AddMonitor", "Monitors must be of the form:\n"
937  " <task>[:<slot>]/<name>\n"
938  " <task>[:<slot>]/<path>/<name>");
939  return false;
940  }
941  // --- Get the manager
942  AliAnalysisManager* mgr = AliAnalysisManager::GetAnalysisManager();
943 
944  // --- Extract task and possibly slot number
945  TString& sTask = static_cast<TObjString*>(tokens->At(0))->String();
946  Int_t slotNo = 0;
947  Ssiz_t colon = sTask.Index(":");
948  if (colon != kNPOS) {
949  TString sSlot = sTask(colon+1, sTask.Length()-colon-1);
950  if (!sSlot.IsNull()) slotNo = sSlot.Atoi();
951  sTask.Remove(colon, sTask.Length()-colon);
952  }
953 
954  AliAnalysisTask* task = mgr->GetTask(sTask);
955  if (!task) {
956  Error("AddMonitor", "Task \"%s\" not registered with manager",
957  sTask.Data());
958  return false;
959  }
960  AliAnalysisDataSlot* slot = task->GetOutputSlot(slotNo);
961  if (!slot) {
962  Error("AddMonitor", "Task \"%s\" does not have an output slot at %d",
963  task->GetName(), slotNo);
964  return false;
965  }
966  AliAnalysisDataContainer* cont = slot->GetContainer();
967  if (!cont) {
968  Error("AddMonitor", "Output slot %d of task \"%s\" has no container",
969  slotNo, task->GetName());
970  return false;
971  }
972  Int_t idx = 1;
973  TString& first = static_cast<TObjString*>(tokens->At(idx))->String();
974  if (first.EqualTo(cont->GetName())) {
975  idx++;
976  }
977  TObject* data = cont->GetData();
978  TObject* obj = data;
979  for (; idx < nTokens; idx++) {
980  }
981  return true;
982  }
983 #endif
984 
989  virtual void Print(Option_t* option="") const
990  {
991  Railway::Print(option);
992  std::cout << std::boolalpha
993  << " --- Other settings -------\n"
994  << " Extra libraries : " << fExtraLibs << "\n"
995  << " Extra PARs : " << fExtraPars << "\n"
996  << " Extra sources : " << fExtraSrcs << "\n"
997  << " Use PARs of tasks: " << fUsePars << "\n"
998  << " Use PARs of base : " << fBasePars
999  << std::noboolalpha << std::endl;
1000  }
1009  virtual Bool_t AuxFile(TString& name, bool copy=false)
1010  {
1011  TString local = name;
1012  Bool_t ret = Railway::AuxFile(local, copy);
1013  if (!name.BeginsWith("/")) {
1014  fAuxFiles.Add(new TObjString(local));
1015  }
1016  name = local;
1017 #if 0
1018  if (ret && name.EndsWith(".root")) {
1019  TFile* file = TFile::Open(name, "READ");
1020  if (file) {
1021  Info("AuxFile", "Adding input file %s", name.Data());
1022  gProof->AddInputData(file, true);
1023  }
1024  }
1025 #endif
1026  return ret;
1027  }
1029  {
1030  Int_t bufSize = 32768;
1031  Char_t buf[bufSize];
1032  Long64_t size = 0;
1033  Long_t id = 0, flags = 0, modtime = 0;
1034  if (gSystem->GetPathInfo(fileName.Data(), &id, &size, &flags, &modtime)==1
1035  || size <= 0) {
1036  Error("SendFile", "Cannot stat %s", fileName.Data());
1037  return -1;
1038  }
1039  TString fn(gSystem->BaseName(fileName.Data()));
1040  TList* slaves = 0; // gProof->GetListOfActiveSlaves(); - protected
1041  TIter next(slaves);
1042  TSlave* sl = 0;
1043  Int_t ret = 0;
1044  Int_t fd = open(fileName.Data(), O_RDONLY);
1045  while ((sl = static_cast<TSlave*>(next()))) {
1046  if (!sl->IsValid()) continue;
1047  if (sl->GetSlaveType() != TSlave::kSlave) continue;
1048 
1049  // Always binary (first 1), never forward (last 0).
1050  snprintf(buf,bufSize,"%s %d %lld %d", fn.Data(), 1, size, 0);
1051  if (sl->GetSocket()->Send(buf, kPROOF_SENDFILE) == -1) {
1052  Warning("SendFile", "Could not send kPROOF_SENDFILE request");
1053  continue;
1054  }
1055 
1056  // Go to the beginning of the file
1057  lseek(fd, 0, SEEK_SET);
1058  Int_t len = 0;
1059  do {
1060  while ((len = read(fd, buf, bufSize)) < 0 &&
1061  TSystem::GetErrno() == EINTR)
1062  TSystem::ResetErrno();
1063  if (len < 0) {
1064  Error("SendFile", "error reading input");
1065  close(fd);
1066  return -1;
1067  }
1068  if (len > 0 && sl->GetSocket()->SendRaw(buf, len) == -1) {
1069  Error("SendFile", "error writing to slave");
1070  sl = 0;
1071  break;
1072  }
1073  } while (len > 0);
1074  ret ++;
1075 
1076  // Wait for slave - private
1077  // if (sl) gProof->Collect(sl,gEnv->GetValue("Proof.CollectTimeout",-1));
1078  }
1079 
1080  // Close the file
1081  close(fd);
1082 
1083  return ret;
1084  }
1090  virtual TString OutputPath() const
1091  {
1092  TString ret;
1093  if (fOptions.Has("dsname")) {
1094  ret = Form("/%s/%s/", gProof->GetGroup(), gProof->GetUser());
1095  ret.Append(OutputUtilities::RegisteredDataset());
1096  }
1097  return ret;
1098  }
1102  virtual const Char_t* UrlHelp() const
1103  {
1104  return
1105  "proof://<host>[:<port>]/[<dataset>|<path>][?<options>][#<treeName>]";
1106  }
1110  virtual const char* Desc() const { return "PROOF"; }
1117  void AuxSave(const TString& escaped,
1118  Bool_t /*asShellScript*/)
1119  {
1120  // Write train to file - for terminate actions
1121  TFile* mgr = TFile::Open(Form("%s.root", escaped.Data()), "RECREATE");
1122  AliAnalysisManager::GetAnalysisManager()->Write();
1123  mgr->Write();
1124 
1125  TString libs = fExtraLibs; libs.ReplaceAll(":", " ");
1126  TString pars = fExtraPars; pars.ReplaceAll(":", " ");
1127  TString srcs = fExtraSrcs; srcs.ReplaceAll(":", " ");
1128  TString macDir("$ALICE_PHYSICS/PWGLF/FORWARD/trains");
1129  std::ofstream t("Terminate.C");
1130  if (!t) {
1131  Error("ProofRailway::AuxSave", "Failed to make terminate ROOT script");
1132  return;
1133  }
1134 
1135  t << "// Generated by ProofRailway\n"
1136  << "Bool_t Terminate()\n"
1137  << "{\n"
1138  << " TString name = \"" << escaped << "\";\n"
1139  << " TString libs = \"" << libs << "\";\n"
1140  << " TString pars = \"" << pars << "\";\n"
1141  << " TString srcs = \"" << srcs << "\";\n\n"
1142  << " gSystem->Load(\"libANALYSIS\");\n"
1143  << " gSystem->Load(\"libANALYSISalice\");\n"
1144  << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");\n\n"
1145  << " gSystem->AddIncludePath(\"-I$ALICE_PHYSICS/include\");\n\n"
1146  << " gROOT->LoadMacro(\"" << macDir << "/ParUtilities.C+g\");\n\n"
1147  << " gROOT->LoadMacro(\"" << macDir << "/ProofTerminate.C+g\");\n\n"
1148  << " return ProofTerminate(name,libs,pars,srcs);\n"
1149  << "}\n"
1150  << "// EOF\n"
1151  << std::endl;
1152  t.close();
1153  }
1161 };
1162 #endif
1163 //
1164 // EOF
1165 //
virtual Bool_t LoadLibrary(const TString &name, Bool_t slaves=true, Bool_t forcePar=false)
Definition: ProofRailway.C:193
ProofRailway & operator=(const ProofRailway &o)
Definition: ProofRailway.C:130
virtual Bool_t CreateAliPhysicsPar()
Definition: ProofRailway.C:561
virtual Bool_t CreatePseudoPar(const TString &parName, const TString &env, const TString &setup)
Definition: ProofRailway.C:329
OptionList fOptions
Definition: Railway.C:654
const char * url
const TString & MakeLibraryName(const TString &name)
Definition: Railway.C:497
virtual Bool_t EnableAliPhysics()
Definition: ProofRailway.C:795
long long Long64_t
Definition: External.C:43
TString fileName
TSystem * gSystem
char Char_t
Definition: External.C:18
virtual void Print(Option_t *="") const
Definition: Railway.C:430
Railway & operator=(const Railway &)
Definition: Railway.C:71
Int_t fVerbose
Definition: Railway.C:655
virtual const char * Desc() const
TString fExtraPars
Special output handling.
virtual Bool_t AuxFile(TString &name, bool copy=false)
static Bool_t StopXrootd()
Definition: External.C:92
Long64_t AsLong(const TString &name, Long64_t def=0) const
Definition: Option.C:659
const TString & Get(const TString &name) const
Definition: Option.C:596
virtual Bool_t PreSetup()
Definition: ProofRailway.C:641
virtual const char * ModeString() const
Definition: ProofRailway.C:612
virtual Bool_t Connect(const TUrl &url, const TString &opts)
Definition: ProofRailway.C:621
TChain * LocalChain()
Definition: Railway.C:586
virtual const char * AliPhysicsParName() const
Definition: ProofRailway.C:303
void AuxSave(const TString &escaped, Bool_t)
Bool_t Has(const TString &name) const
Definition: Option.C:584
int Int_t
Definition: External.C:63
virtual Bool_t CreateAliROOTPar()
Definition: ProofRailway.C:491
static Bool_t Find(const TString &what)
Definition: ParUtilities.C:65
ProofRailway(const ProofRailway &o)
Definition: ProofRailway.C:120
virtual Bool_t EnableSpecial(const TString &parName, const TString &prefix)
Definition: ProofRailway.C:740
virtual void GetDataSet(TString &dsname)
Definition: ProofRailway.C:878
virtual Bool_t AddIncludePath(const TString &path)
Definition: ProofRailway.C:173
TString fExtraLibs
virtual Bool_t LoadSource(const TString &name, bool copy=true)
Definition: ProofRailway.C:240
TUrl fUrl
Definition: Railway.C:653
virtual Bool_t AuxFile(TString &name, bool copy=false)
Definition: Railway.C:524
virtual void Print(Option_t *option="") const
Definition: ProofRailway.C:989
virtual Long64_t Run(Long64_t nEvents=-1)
Definition: ProofRailway.C:889
virtual Bool_t PostSetup()
Definition: ProofRailway.C:804
virtual Bool_t AddMonitor(const TString &)
Definition: Railway.C:419
TString fExtraSrcs
virtual Bool_t LoadAliROOT()
Definition: ProofRailway.C:251
virtual UShort_t Mode() const
Definition: ProofRailway.C:608
virtual Bool_t AddIncludePath(const TString &path)
Definition: Railway.C:201
void UsePar(Bool_t &use)
Definition: ProofRailway.C:160
virtual Bool_t EnableAliROOT()
Definition: ProofRailway.C:786
Option * Add(const TString &name, const TString &arg, const TString &desc, const TString &val="")
Definition: Option.C:421
PAR file utilities.
virtual const Char_t * UrlHelp() const
Int_t SendFile(const TString &fileName)
Float_t nEvents[nProd]
virtual Bool_t LoadExtraSrcs()
Definition: ProofRailway.C:858
virtual ~ProofRailway()
Definition: ProofRailway.C:146
TFile * file
Base class for analysis helpers.
virtual const char * AliROOTParName() const
Definition: ProofRailway.C:294
virtual Bool_t LoadAliPhysics()
Definition: ProofRailway.C:274
unsigned short UShort_t
Definition: External.C:28
static TString RegisteredDataset()
virtual TString OutputPath() const
const char Option_t
Definition: External.C:48
bool Bool_t
Definition: External.C:53
static Bool_t Build(const TString &what)
Definition: ParUtilities.C:130
static Bool_t RegisterDataset(const TString &dsname)
Bool_t fTestBuild
static Bool_t MakeAuxFilePAR(const TList &files, const TString &name, Bool_t verbose=false)
Definition: ParUtilities.C:538
static void ExportEnvVar(TString &out, const TString &name)
Definition: ProofRailway.C:460
Bool_t fBasePars
ProofRailway(const TUrl &url, Int_t verbose)
Definition: ProofRailway.C:94
virtual Bool_t LoadSource(const TString &name, bool copy=false)
Definition: Railway.C:231
static Bool_t RegisterStorage(const TString &url)