21 # include <TProofLog.h>
22 # include <TProofDebug.h>
23 # include <AliAnalysisManager.h>
104 fOptions.
Add(
"workers",
"N[x]",
"Number of workers to use", 0);
105 fOptions.
Add(
"dsname",
"NAME",
"Make output dataset",
"");
106 fOptions.
Add(
"par",
"tasks|all",
"Use par files",
"tasks");
107 fOptions.
Add(
"mode",
"default|rec|sim",
"AliROOT mode",
"default");
108 fOptions.
Add(
"storage",
"URL",
"Location for external storage",
"");
109 fOptions.
Add(
"wrapper",
"CMD",
"Wrapper command",
"");
110 fOptions.
Add(
"clear",
"PKGS",
"Clear packages ','-separated",
"");
111 fOptions.
Add(
"reset",
"soft|hard",
"Reset cluster",
"hard");
112 fOptions.
Add(
"feedback",
"Enable feedback mechanism");
113 fOptions.
Add(
"env",
"SCRIPT",
"Script to set-up environment",
"-none-");
114 fOptions.
Add(
"offset",
"EVENTS",
"Skip this number of events", 0);
116 if (!
fUrl.GetUser() ||
fUrl.GetUser()[0] ==
'\0')
132 if (&o ==
this)
return *
this;
177 if (!p.BeginsWith(
"-")) {
181 gProof->AddIncludePath(p);
199 if (name.EqualTo(
"STEERBase") ||
200 name.EqualTo(
"ESD") ||
201 name.EqualTo(
"AOD") ||
202 name.EqualTo(
"ANALYSIS") ||
203 name.EqualTo(
"OADB") ||
204 name.EqualTo(
"ANALYSISalice"))
207 if ((!
fUsePars || isBase) && !forcePar) {
209 if (ret < 0)
return false;
210 if (slaves)
fExtraLibs.Append(Form(
":%s", name.Data()));
214 Error(
"ProofRailway::LoadLibrary",
"Failed to find PAR file %s",
219 Error(
"ProofRailway::LoadLibrary",
"Failed to build PAR file %s",
223 if (gProof->UploadPackage(name.Data(), TProof::kRemoveOld) < 0) {
224 Error(
"ProofRailway::LoadLibrary",
"Failed to upload PAR file %s",
253 if (!
gSystem->Getenv(
"ALICE_ROOT")) {
254 Error(
"ProofRailway::LoadAliROOT",
"Local AliROOT not available");
276 if (!
gSystem->Getenv(
"ALICE_PHYSICS")) {
277 Error(
"ProofRailway::LoadAliPhysics",
278 "Local AliPhysics not available");
336 TString parFile(Form(
"%s.par", parName.Data()));
339 if (
gSystem->AccessPathName(parName.Data()) == 0) {
341 if (
gSystem->Exec(Form(
"rm -rf %s", parName.Data())) != 0) {
342 Error(
"ProofRailway",
"Failed to remove %s", parName.Data());
347 if (
gSystem->AccessPathName(parFile.Data()) == 0) {
348 if (
gSystem->Unlink(parFile.Data()) != 0) {
349 Error(
"ProofRailway::CreatePseudoPar",
"Failed to remove %s",
357 if (
gSystem->MakeDirectory(parName) < 0) {
358 Error(
"ProofRailway::CreatePseudoPar",
"Could not make directory '%s'",
363 if (
gSystem->MakeDirectory(Form(
"%s/PROOF-INF", parName.Data()))) {
364 Error(
"ProofRailway::CreatePseudoPar",
365 "Could not make directory %s/PROOF-INF",
372 if (!envScript.EndsWith(
".C") && !envScript.EndsWith(
".sh"))
374 Printf(
"Environment script: %s", envScript.Data());
377 std::ofstream b(Form(
"%s/PROOF-INF/BUILD.sh",parName.Data()));
379 Error(
"ProofRailway::CreatePseudoPar",
380 "Failed to make BUILD.sh shell script");
383 b <<
"#!/bin/sh\n\n";
384 if (envScript.EndsWith(
".sh"))
385 b <<
". PROOF-INF/" <<
gSystem->BaseName(envScript.Data()) <<
"\n";
387 b <<
"# echo Nothing to do\n";
388 b <<
"exit 0\n" << std::endl;
390 gSystem->Exec(Form(
"chmod a+x %s/PROOF-INF/BUILD.sh",parName.Data()));
392 if (!envScript.IsNull()) {
394 if (
gSystem->AccessPathName(envScript.Data()) == 0) {
396 if (
gSystem->Exec(Form(
"cp %s %s/PROOF-INF/", envScript.Data(),
397 parName.Data())) != 0) {
398 Error(
"ProofRailway",
"Failed to copy %s", envScript.Data());
403 Warning(
"CreateALIROOTPar",
"Couldn't read %s", envScript.Data());
408 std::ofstream s(Form(
"%s/PROOF-INF/SETUP.C", parName.Data()));
410 Error(
"ProofRailway::CreatePseudoPar",
411 "Failed to make SETUP.C ROOT script");
414 s <<
"Bool_t Load(const char* lib) {"
415 <<
" TString ln(lib);"
416 <<
" if (!ln.BeginsWith(\"lib\")) ln.Prepend(\"lib\");\n"
417 <<
" if (!ln.EndsWith(\".so\")) ln.Append(\".so\");\n"
418 <<
" TString w = gSystem->Which(gSystem->GetDynamicPath(),ln);\n"
419 <<
" if (w.IsNull()) { \n"
420 <<
" Warning(\"Load\",\"%s not found\",ln.Data());"
421 <<
" return false; }\n"
422 <<
" Info(\"Load\",\"Trying to load %s\",ln.Data());\n"
423 <<
" if (gSystem->Load(ln) < 0) return false;\n"
428 s <<
"void SETUP(TList* opts) {\n";
429 if (envScript.IsNull()) {
430 s << env << std::endl;
432 else if (envScript.EndsWith(
".C")) {
433 s <<
" gROOT->Macro(\"PROOF-INF/" <<
gSystem->BaseName(envScript.Data())
442 parFile.Data(), parName.Data()));
444 Error(
"ProofRailway::CreatePseudoPar",
"Failed to pack up PAR file %s",
450 ret = gProof->UploadPackage(parFile.Data(),TProof::kRemoveOld);
452 Error(
"ProofRailway::CreatePseudoPar",
453 "Failed to upload the AliROOT PAR file");
463 if (env.IsNull())
return;
465 out.Append(Form(
" gSystem->Setenv(\"%s\",\"%s\");\n",
466 name.Data(), env.Data()));
495 if (envScript.EqualTo(
"-none-", TString::kIgnoreCase) ||
496 !(envScript.EndsWith(
".C") || envScript.EndsWith(
".sh")))
499 Printf(
"Environment script: %s", envScript.Data());
505 TString setup(
" gSystem->AddDynamicPath(\"${ALICE_ROOT}/lib\");\n"
506 " gSystem->AddIncludePath(\"-I${ALICE_ROOT}/include\");\n"
507 " Info(\"SETUP\",\"Loading ROOT libraries\");\n"
508 " Load(\"libTree\");\n"
509 " Load(\"libGeom\");\n"
510 " Load(\"libVMC\");\n"
511 " Load(\"libPhysics\");\n"
512 " Load(\"libMinuit\");\n"
514 " Info(\"SETUP\",\"Parameter list:\");\n"
515 " if (!opts) return;\n"
518 " TObject* par = opts->FindObject(\"ALIROOT_MODE\");\n"
520 " Info(\"SETUP\",\"ALIROOT mode: %s\",par->GetTitle());\n"
521 " TString mode(par->GetTitle());mode.ToLower();\n"
522 " if (mode.EqualTo(\"default\")) {\n"
523 " Load(\"libSTEERBase\");\n"
524 " Load(\"libESD\");\n"
525 " Load(\"libAOD\");\n"
526 " Load(\"libANALYSIS\");\n"
527 " Load(\"libANALYSISalice\");\n"
529 " else if (mode.EqualTo(\"aliroot\")) \n"
530 " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibs.C\");\n"
531 " else if (mode.EqualTo(\"rec\")) \n"
532 " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibsrec.C\");\n"
533 " else if (mode.EqualTo(\"sim\")) \n"
534 " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibssim.C\");\n"
535 " else if (mode.EqualTo(\"train\")) \n"
536 " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibstrain.C\");\n"
537 " else if (mode.EqualTo(\"custom\")) \n"
538 " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibstrain.C\");\n"
541 "par = opts->FindObject(\"ALIROOT_EXTRA_LIBS\");\n"
543 " Info(\"SETUP\",\"Libaries to load: %s\\n\",\n"
544 " par->GetTitle());\n"
545 " TString tit(par->GetTitle());\n"
546 " TObjArray* tokens = tit.Tokenize(\":\");\n"
547 " TObject* lib = 0;\n"
548 " TIter next(tokens);\n"
549 " while ((lib = next())) {\n"
550 " TString libName(lib->GetName());\n"
551 " if (!libName.BeginsWith(\"lib\"))\n"
552 " libName.Prepend(\"lib\");\n"
553 " // Info(\"SETUP\",\"Loading %s ...\",libName.Data());\n"
554 " Load(libName.Data());\n"
559 return CreatePseudoPar(parName,envScript.IsNull() ? env : envScript,setup);
568 TString setup(
"gSystem->AddDynamicPath(\"$(ALICE_PHYSICS)/lib\");\n"
569 "gSystem->AddIncludePath(\"-I${ALICE_PHYSICS}/include\");\n"
570 "// Info(\"SETUP\",\"Parameter list:\");\n"
571 "if (!opts) return;\n"
574 "TObject* par = opts->FindObject(\"ALIPHYSICS_MODE\");\n"
576 " //Info(\"SETUP\",\"ALIPHYSICS mode:%s\",par->GetTitle());\n"
577 " TString mode(par->GetTitle());mode.ToLower();\n"
578 " if (mode.EqualTo(\"default\")) {\n"
579 " gSystem->Load(\"libOADB\");\n"
583 "par = opts->FindObject(\"ALIPHYSICS_EXTRA_LIBS\");\n"
585 " Info(\"SETUP\",\"Libaries to load: %s\\n\",\n"
586 " par->GetTitle());\n"
587 " TString tit(par->GetTitle());\n"
588 " TObjArray* tokens = tit.Tokenize(\":\");\n"
589 " TObject* lib = 0;\n"
590 " TIter next(tokens);\n"
591 " while ((lib = next())) {\n"
592 " TString libName(lib->GetName());\n"
593 " if (!libName.BeginsWith(\"lib\"))\n"
594 " libName.Prepend(\"lib\");\n"
595 " // Info(\"SETUP\",\"Loading %s ...\",libName.Data());\n"
596 " gSystem->Load(libName.Data());\n"
623 Info(
"ProofRailway::Connect",
"Connecting to %s with %soptions %s",
625 opts.IsNull() ?
"no " :
"",
627 TProof::Open(url.GetUrl(), opts);
630 Error(
"ProofRailway::Connect",
"Failed to open Proof connection %s",
644 gEnv->SetValue(
"XSec.GSI.DelegProxy",
"2");
648 gEnv->SetValue(
"Proof.GlobalPackageDirs",
650 gEnv->GetValue(
"Proof.GlobalPackageDirs",
"."),
651 gSystem->Getenv(
"ALICE_PHYSICS"),
652 gSystem->Getenv(
"ALICE_ROOT")));
656 connect.SetAnchor(
"");
658 connect.SetOptions(
"");
663 Bool_t hard = (reset.IsNull() ||
664 reset.EqualTo(
"hard", TString::kIgnoreCase));
665 Info(
"ProofRailway::PreSetup",
"Doing a %s reset of %s",
666 hard ?
"hard" :
"soft", connect.GetUrl());
667 TProof::Reset(connect.GetUrl(), hard);
669 Info(
"ProofRailway::PreSetup",
670 "Waiting for %d second%s for things to settle", secs,
671 secs > 1 ?
"s" :
"");
678 if (wrapper.IsNull())
681 wrapper =
"/usr/bin/gdb --batch -ex run -ex bt --args";
682 Info(
"ProofRailway::PreSetup",
"Using wrapper command: %s",
684 TProof::AddEnvVar(
"PROOF_WRAPPERCMD", wrapper);
690 fOptions.
Get(
"par").EqualTo(
"all",TString::kIgnoreCase));
696 opts.Append(Form(
"workers=%s",
fOptions.
Get(
"workers").Data()));
698 if (!
Connect(connect, opts))
return false;
703 Info(
"PreSetup",
"Clearing (%s)", pkgs.Data());
704 if (pkgs.IsNull() || pkgs.EqualTo(
"all", TString::kIgnoreCase)) {
706 if (gProof->ClearPackages() != 0)
707 Warning(
"ProofRailway::PreSetup",
"Failed to clear all packages");
708 Info(
"PreSetup",
"Clearing cache");
709 gProof->ClearCache(
"*");
716 while ((pkg = next())) {
717 if (
TString(pkg->GetName()).EqualTo(
"cache")) {
718 Info(
"PreSetup",
"Clearing cache");
719 gProof->ClearCache();
722 Info(
"PreSetup",
"Clearing package %s", pkg->GetName());
723 if (gProof->ClearPackage(pkg->GetName()) != 0)
724 Warning(
"ProofRailway::PreSetup",
"Failed to clear package %s",
743 if (prefix.IsNull()) {
744 Warning(
"EnableSpecial",
"No prefix specified");
747 const char* prf = prefix.Data();
748 if (parName.IsNull()) {
749 Warning(
"EnableSpecial",
"No par name specified for %s", prf);
754 params->SetOwner(
true);
758 params->Add(
new TNamed(Form(
"%s_EXTRA_LIBS", prf), tmp.Data()));
762 params->Add(
new TNamed(Form(
"%s_MODE", prf),
765 params->Add(
new TNamed(Form(
"%s_MODE", prf),
"default"));
769 params->Add(
new TNamed(Form(
"%s_ENABLE_ALIEN", prf),
"1"));
772 Int_t ret = gProof->EnablePackage(parName.Data(), params,
true);
774 Error(
"ProofRailway::EnableSpecial",
"Failed to enable %s PAR %s",
775 prf, parName.Data());
808 Error(
"ProofRailway::PostSetup",
"No analysis manager defined");
820 gProof->ClearFeedback();
830 TString name = TString::Format(
"%s_auxfiles", mgr->GetName());
833 if (gProof->UploadPackage(name.Data(), TProof::kRemoveOld) < 0)
834 Error(
"ProofRailway::PostSetup",
"Failed to upload PAR file %s",
845 while ((obj = next())) {
847 Int_t ret = gProof->EnablePackage(obj->GetName(),
true);
849 Error(
"ProofRailway::PostSetup",
"Failed to enable PAR %s",
864 while ((obj = next2())) {
865 Int_t ret = gProof->Load(Form(
"%s+g", obj->GetName()),
true);
867 Error(
"ProofRailway::PostSetup",
"Failed to compile %s",obj->GetName());
880 dsname =
fUrl.GetFile();
892 gProof->SetLogLevel(TMath::Max(
fVerbose-2,0),
899 TProofDebug::kPackage);
908 Warning(
"Run",
"Number of events %lld < offset (%lld), stopping",
912 Printf(
"Running proof analysis on data set %s", dsName.Data());
916 ret = mgr->StartAnalysis(
fUrl.GetProtocol(), chain,
nEvents, off);
918 ret = mgr->StartAnalysis(
fUrl.GetProtocol(), dsName.Data(),
nEvents, off);
921 if (!store.IsNull() && store.EqualTo(
"auto",TString::kIgnoreCase))
925 TProof::Mgr(
fUrl.GetUrl())->GetSessionLogs()->Save(
"*",
"proof.log");
931 if (path.IsNull())
return true;
934 Int_t nTokens = tokens->GetEntries();
936 Error(
"AddMonitor",
"Monitors must be of the form:\n"
937 " <task>[:<slot>]/<name>\n"
938 " <task>[:<slot>]/<path>/<name>");
945 TString& sTask =
static_cast<TObjString*
>(tokens->At(0))->String();
947 Ssiz_t colon = sTask.Index(
":");
948 if (colon != kNPOS) {
949 TString sSlot = sTask(colon+1, sTask.Length()-colon-1);
950 if (!sSlot.IsNull()) slotNo = sSlot.Atoi();
951 sTask.Remove(colon, sTask.Length()-colon);
956 Error(
"AddMonitor",
"Task \"%s\" not registered with manager",
960 AliAnalysisDataSlot* slot = task->GetOutputSlot(slotNo);
962 Error(
"AddMonitor",
"Task \"%s\" does not have an output slot at %d",
963 task->GetName(), slotNo);
966 AliAnalysisDataContainer* cont = slot->GetContainer();
968 Error(
"AddMonitor",
"Output slot %d of task \"%s\" has no container",
969 slotNo, task->GetName());
973 TString& first =
static_cast<TObjString*
>(tokens->At(idx))->String();
974 if (first.EqualTo(cont->GetName())) {
977 TObject* data = cont->GetData();
979 for (; idx < nTokens; idx++) {
992 std::cout << std::boolalpha
993 <<
" --- Other settings -------\n"
994 <<
" Extra libraries : " <<
fExtraLibs <<
"\n"
997 <<
" Use PARs of tasks: " <<
fUsePars <<
"\n"
999 << std::noboolalpha << std::endl;
1013 if (!name.BeginsWith(
"/")) {
1018 if (ret && name.EndsWith(
".root")) {
1019 TFile*
file = TFile::Open(name,
"READ");
1021 Info(
"AuxFile",
"Adding input file %s", name.Data());
1022 gProof->AddInputData(file,
true);
1030 Int_t bufSize = 32768;
1033 Long_t
id = 0, flags = 0, modtime = 0;
1034 if (
gSystem->GetPathInfo(fileName.Data(), &id, &size, &flags, &modtime)==1
1036 Error(
"SendFile",
"Cannot stat %s", fileName.Data());
1044 Int_t fd = open(fileName.Data(), O_RDONLY);
1045 while ((sl = static_cast<TSlave*>(next()))) {
1046 if (!sl->IsValid())
continue;
1047 if (sl->GetSlaveType() != TSlave::kSlave)
continue;
1050 snprintf(buf,bufSize,
"%s %d %lld %d", fn.Data(), 1, size, 0);
1051 if (sl->GetSocket()->Send(buf, kPROOF_SENDFILE) == -1) {
1052 Warning(
"SendFile",
"Could not send kPROOF_SENDFILE request");
1057 lseek(fd, 0, SEEK_SET);
1060 while ((len = read(fd, buf, bufSize)) < 0 &&
1061 TSystem::GetErrno() == EINTR)
1062 TSystem::ResetErrno();
1064 Error(
"SendFile",
"error reading input");
1068 if (len > 0 && sl->GetSocket()->SendRaw(buf, len) == -1) {
1069 Error(
"SendFile",
"error writing to slave");
1094 ret = Form(
"/%s/%s/", gProof->GetGroup(), gProof->GetUser());
1105 "proof://<host>[:<port>]/[<dataset>|<path>][?<options>][#<treeName>]";
1110 virtual const char*
Desc()
const {
return "PROOF"; }
1121 TFile* mgr = TFile::Open(Form(
"%s.root", escaped.Data()),
"RECREATE");
1122 AliAnalysisManager::GetAnalysisManager()->Write();
1128 TString macDir(
"$ALICE_PHYSICS/PWGLF/FORWARD/trains");
1129 std::ofstream t(
"Terminate.C");
1131 Error(
"ProofRailway::AuxSave",
"Failed to make terminate ROOT script");
1135 t <<
"// Generated by ProofRailway\n"
1136 <<
"Bool_t Terminate()\n"
1138 <<
" TString name = \"" << escaped <<
"\";\n"
1139 <<
" TString libs = \"" << libs <<
"\";\n"
1140 <<
" TString pars = \"" << pars <<
"\";\n"
1141 <<
" TString srcs = \"" << srcs <<
"\";\n\n"
1142 <<
" gSystem->Load(\"libANALYSIS\");\n"
1143 <<
" gSystem->Load(\"libANALYSISalice\");\n"
1144 <<
" gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");\n\n"
1145 <<
" gSystem->AddIncludePath(\"-I$ALICE_PHYSICS/include\");\n\n"
1146 <<
" gROOT->LoadMacro(\"" << macDir <<
"/ParUtilities.C+g\");\n\n"
1147 <<
" gROOT->LoadMacro(\"" << macDir <<
"/ProofTerminate.C+g\");\n\n"
1148 <<
" return ProofTerminate(name,libs,pars,srcs);\n"
virtual Bool_t LoadLibrary(const TString &name, Bool_t slaves=true, Bool_t forcePar=false)
ProofRailway & operator=(const ProofRailway &o)
virtual Bool_t CreateAliPhysicsPar()
virtual Bool_t CreatePseudoPar(const TString &parName, const TString &env, const TString &setup)
const TString & MakeLibraryName(const TString &name)
virtual Bool_t EnableAliPhysics()
virtual void Print(Option_t *="") const
Railway & operator=(const Railway &)
virtual const char * Desc() const
virtual Bool_t AuxFile(TString &name, bool copy=false)
static Bool_t StopXrootd()
Long64_t AsLong(const TString &name, Long64_t def=0) const
const TString & Get(const TString &name) const
virtual Bool_t PreSetup()
virtual const char * ModeString() const
virtual Bool_t Connect(const TUrl &url, const TString &opts)
virtual const char * AliPhysicsParName() const
void AuxSave(const TString &escaped, Bool_t)
Bool_t Has(const TString &name) const
virtual Bool_t CreateAliROOTPar()
static Bool_t Find(const TString &what)
ProofRailway(const ProofRailway &o)
virtual Bool_t EnableSpecial(const TString &parName, const TString &prefix)
virtual void GetDataSet(TString &dsname)
virtual Bool_t AddIncludePath(const TString &path)
virtual Bool_t LoadSource(const TString &name, bool copy=true)
virtual Bool_t AuxFile(TString &name, bool copy=false)
virtual void Print(Option_t *option="") const
virtual Long64_t Run(Long64_t nEvents=-1)
virtual Bool_t PostSetup()
virtual Bool_t AddMonitor(const TString &)
virtual Bool_t LoadAliROOT()
virtual UShort_t Mode() const
virtual Bool_t AddIncludePath(const TString &path)
virtual Bool_t EnableAliROOT()
Option * Add(const TString &name, const TString &arg, const TString &desc, const TString &val="")
virtual const Char_t * UrlHelp() const
Int_t SendFile(const TString &fileName)
virtual Bool_t LoadExtraSrcs()
Base class for analysis helpers.
virtual const char * AliROOTParName() const
virtual Bool_t LoadAliPhysics()
static TString RegisteredDataset()
virtual TString OutputPath() const
static Bool_t Build(const TString &what)
static Bool_t RegisterDataset(const TString &dsname)
static Bool_t MakeAuxFilePAR(const TList &files, const TString &name, Bool_t verbose=false)
static void ExportEnvVar(TString &out, const TString &name)
ProofRailway(const TUrl &url, Int_t verbose)
virtual Bool_t LoadSource(const TString &name, bool copy=false)
static Bool_t RegisterStorage(const TString &url)