21 # include <TProofLog.h> 22 # include <TProofDebug.h> 23 # include <AliAnalysisManager.h> 104 fOptions.
Add(
"workers",
"N[x]",
"Number of workers to use", 0);
105 fOptions.
Add(
"dsname",
"NAME",
"Make output dataset",
"");
106 fOptions.
Add(
"par",
"tasks|all",
"Use par files",
"tasks");
107 fOptions.
Add(
"mode",
"default|rec|sim",
"AliROOT mode",
"default");
108 fOptions.
Add(
"storage",
"URL",
"Location for external storage",
"");
109 fOptions.
Add(
"wrapper",
"CMD",
"Wrapper command",
"");
110 fOptions.
Add(
"clear",
"PKGS",
"Clear packages ','-separated",
"");
111 fOptions.
Add(
"reset",
"soft|hard",
"Reset cluster",
"hard");
112 fOptions.
Add(
"feedback",
"Enable feedback mechanism");
113 fOptions.
Add(
"env",
"SCRIPT",
"Script to set-up environment",
"-none-");
114 fOptions.
Add(
"offset",
"EVENTS",
"Skip this number of events", 0);
116 if (!
fUrl.GetUser() ||
fUrl.GetUser()[0] ==
'\0')
132 if (&o ==
this)
return *
this;
177 if (!p.BeginsWith(
"-")) {
181 gProof->AddIncludePath(p);
199 if (name.EqualTo(
"STEERBase") ||
200 name.EqualTo(
"ESD") ||
201 name.EqualTo(
"AOD") ||
202 name.EqualTo(
"ANALYSIS") ||
203 name.EqualTo(
"OADB") ||
204 name.EqualTo(
"ANALYSISalice"))
207 if ((!
fUsePars || isBase) && !forcePar) {
209 if (ret < 0)
return false;
210 if (slaves)
fExtraLibs.Append(Form(
":%s", name.Data()));
214 Error(
"ProofRailway::LoadLibrary",
"Failed to find PAR file %s",
219 Error(
"ProofRailway::LoadLibrary",
"Failed to build PAR file %s",
223 if (gProof->UploadPackage(name.Data(), TProof::kRemoveOld) < 0) {
224 Error(
"ProofRailway::LoadLibrary",
"Failed to upload PAR file %s",
253 if (!
gSystem->Getenv(
"ALICE_ROOT")) {
254 Error(
"ProofRailway::LoadAliROOT",
"Local AliROOT not available");
276 if (!
gSystem->Getenv(
"ALICE_PHYSICS")) {
277 Error(
"ProofRailway::LoadAliPhysics",
278 "Local AliPhysics not available");
336 TString parFile(Form(
"%s.par", parName.Data()));
339 if (
gSystem->AccessPathName(parName.Data()) == 0) {
341 if (
gSystem->Exec(Form(
"rm -rf %s", parName.Data())) != 0) {
342 Error(
"ProofRailway",
"Failed to remove %s", parName.Data());
347 if (
gSystem->AccessPathName(parFile.Data()) == 0) {
348 if (
gSystem->Unlink(parFile.Data()) != 0) {
349 Error(
"ProofRailway::CreatePseudoPar",
"Failed to remove %s",
357 if (
gSystem->MakeDirectory(parName) < 0) {
358 Error(
"ProofRailway::CreatePseudoPar",
"Could not make directory '%s'",
363 if (
gSystem->MakeDirectory(Form(
"%s/PROOF-INF", parName.Data()))) {
364 Error(
"ProofRailway::CreatePseudoPar",
365 "Could not make directory %s/PROOF-INF",
372 if (!envScript.EndsWith(
".C") && !envScript.EndsWith(
".sh"))
374 Printf(
"Environment script: %s", envScript.Data());
377 std::ofstream b(Form(
"%s/PROOF-INF/BUILD.sh",parName.Data()));
379 Error(
"ProofRailway::CreatePseudoPar",
380 "Failed to make BUILD.sh shell script");
383 b <<
"#!/bin/sh\n\n";
384 if (envScript.EndsWith(
".sh"))
385 b <<
". PROOF-INF/" <<
gSystem->BaseName(envScript.Data()) <<
"\n";
387 b <<
"# echo Nothing to do\n";
388 b <<
"exit 0\n" << std::endl;
390 gSystem->Exec(Form(
"chmod a+x %s/PROOF-INF/BUILD.sh",parName.Data()));
392 if (!envScript.IsNull()) {
394 if (
gSystem->AccessPathName(envScript.Data()) == 0) {
396 if (
gSystem->Exec(Form(
"cp %s %s/PROOF-INF/", envScript.Data(),
397 parName.Data())) != 0) {
398 Error(
"ProofRailway",
"Failed to copy %s", envScript.Data());
403 Warning(
"CreateALIROOTPar",
"Couldn't read %s", envScript.Data());
408 std::ofstream s(Form(
"%s/PROOF-INF/SETUP.C", parName.Data()));
410 Error(
"ProofRailway::CreatePseudoPar",
411 "Failed to make SETUP.C ROOT script");
414 s <<
"Bool_t Load(const char* lib) {" 415 <<
" TString ln(lib);" 416 <<
" if (!ln.BeginsWith(\"lib\")) ln.Prepend(\"lib\");\n" 417 <<
" if (!ln.EndsWith(\".so\")) ln.Append(\".so\");\n" 418 <<
" TString w = gSystem->Which(gSystem->GetDynamicPath(),ln);\n" 419 <<
" if (w.IsNull()) { \n" 420 <<
" Warning(\"Load\",\"%s not found\",ln.Data());" 421 <<
" return false; }\n" 422 <<
" Info(\"Load\",\"Trying to load %s\",ln.Data());\n" 423 <<
" if (gSystem->Load(ln) < 0) return false;\n" 428 s <<
"void SETUP(TList* opts) {\n";
429 if (envScript.IsNull()) {
430 s << env << std::endl;
432 else if (envScript.EndsWith(
".C")) {
433 s <<
" gROOT->Macro(\"PROOF-INF/" <<
gSystem->BaseName(envScript.Data())
442 parFile.Data(), parName.Data()));
444 Error(
"ProofRailway::CreatePseudoPar",
"Failed to pack up PAR file %s",
450 ret = gProof->UploadPackage(parFile.Data(),TProof::kRemoveOld);
452 Error(
"ProofRailway::CreatePseudoPar",
453 "Failed to upload the AliROOT PAR file");
463 if (env.IsNull())
return;
465 out.Append(Form(
" gSystem->Setenv(\"%s\",\"%s\");\n",
466 name.Data(), env.Data()));
495 if (envScript.EqualTo(
"-none-", TString::kIgnoreCase) ||
496 !(envScript.EndsWith(
".C") || envScript.EndsWith(
".sh")))
499 Printf(
"Environment script: %s", envScript.Data());
505 TString setup(
" gSystem->AddDynamicPath(\"${ALICE_ROOT}/lib\");\n" 506 " gSystem->AddIncludePath(\"-I${ALICE_ROOT}/include\");\n" 507 " Info(\"SETUP\",\"Loading ROOT libraries\");\n" 508 " Load(\"libTree\");\n" 509 " Load(\"libGeom\");\n" 510 " Load(\"libVMC\");\n" 511 " Load(\"libPhysics\");\n" 512 " Load(\"libMinuit\");\n" 514 " Info(\"SETUP\",\"Parameter list:\");\n" 515 " if (!opts) return;\n" 518 " TObject* par = opts->FindObject(\"ALIROOT_MODE\");\n" 520 " Info(\"SETUP\",\"ALIROOT mode: %s\",par->GetTitle());\n" 521 " TString mode(par->GetTitle());mode.ToLower();\n" 522 " if (mode.EqualTo(\"default\")) {\n" 523 " Load(\"libSTEERBase\");\n" 524 " Load(\"libESD\");\n" 525 " Load(\"libAOD\");\n" 526 " Load(\"libANALYSIS\");\n" 527 " Load(\"libANALYSISalice\");\n" 529 " else if (mode.EqualTo(\"aliroot\")) \n" 530 " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibs.C\");\n" 531 " else if (mode.EqualTo(\"rec\")) \n" 532 " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibsrec.C\");\n" 533 " else if (mode.EqualTo(\"sim\")) \n" 534 " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibssim.C\");\n" 535 " else if (mode.EqualTo(\"train\")) \n" 536 " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibstrain.C\");\n" 537 " else if (mode.EqualTo(\"custom\")) \n" 538 " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibstrain.C\");\n" 541 "par = opts->FindObject(\"ALIROOT_EXTRA_LIBS\");\n" 543 " Info(\"SETUP\",\"Libaries to load: %s\\n\",\n" 544 " par->GetTitle());\n" 545 " TString tit(par->GetTitle());\n" 546 " TObjArray* tokens = tit.Tokenize(\":\");\n" 547 " TObject* lib = 0;\n" 548 " TIter next(tokens);\n" 549 " while ((lib = next())) {\n" 550 " TString libName(lib->GetName());\n" 551 " if (!libName.BeginsWith(\"lib\"))\n" 552 " libName.Prepend(\"lib\");\n" 553 " // Info(\"SETUP\",\"Loading %s ...\",libName.Data());\n" 554 " Load(libName.Data());\n" 559 return CreatePseudoPar(parName,envScript.IsNull() ? env : envScript,setup);
568 TString setup(
"gSystem->AddDynamicPath(\"$(ALICE_PHYSICS)/lib\");\n" 569 "gSystem->AddIncludePath(\"-I${ALICE_PHYSICS}/include\");\n" 570 "// Info(\"SETUP\",\"Parameter list:\");\n" 571 "if (!opts) return;\n" 574 "TObject* par = opts->FindObject(\"ALIPHYSICS_MODE\");\n" 576 " //Info(\"SETUP\",\"ALIPHYSICS mode:%s\",par->GetTitle());\n" 577 " TString mode(par->GetTitle());mode.ToLower();\n" 578 " if (mode.EqualTo(\"default\")) {\n" 579 " gSystem->Load(\"libOADB\");\n" 583 "par = opts->FindObject(\"ALIPHYSICS_EXTRA_LIBS\");\n" 585 " Info(\"SETUP\",\"Libaries to load: %s\\n\",\n" 586 " par->GetTitle());\n" 587 " TString tit(par->GetTitle());\n" 588 " TObjArray* tokens = tit.Tokenize(\":\");\n" 589 " TObject* lib = 0;\n" 590 " TIter next(tokens);\n" 591 " while ((lib = next())) {\n" 592 " TString libName(lib->GetName());\n" 593 " if (!libName.BeginsWith(\"lib\"))\n" 594 " libName.Prepend(\"lib\");\n" 595 " // Info(\"SETUP\",\"Loading %s ...\",libName.Data());\n" 596 " gSystem->Load(libName.Data());\n" 623 Info(
"ProofRailway::Connect",
"Connecting to %s with %soptions %s",
625 opts.IsNull() ?
"no " :
"",
627 TProof::Open(url.GetUrl(), opts);
630 Error(
"ProofRailway::Connect",
"Failed to open Proof connection %s",
644 gEnv->SetValue(
"XSec.GSI.DelegProxy",
"2");
648 gEnv->SetValue(
"Proof.GlobalPackageDirs",
650 gEnv->GetValue(
"Proof.GlobalPackageDirs",
"."),
651 gSystem->Getenv(
"ALICE_PHYSICS"),
652 gSystem->Getenv(
"ALICE_ROOT")));
656 connect.SetAnchor(
"");
658 connect.SetOptions(
"");
663 Bool_t hard = (reset.IsNull() ||
664 reset.EqualTo(
"hard", TString::kIgnoreCase));
665 Info(
"ProofRailway::PreSetup",
"Doing a %s reset of %s",
666 hard ?
"hard" :
"soft", connect.GetUrl());
667 TProof::Reset(connect.GetUrl(), hard);
669 Info(
"ProofRailway::PreSetup",
670 "Waiting for %d second%s for things to settle", secs,
671 secs > 1 ?
"s" :
"");
678 if (wrapper.IsNull())
681 wrapper =
"/usr/bin/gdb --batch -ex run -ex bt --args";
682 Info(
"ProofRailway::PreSetup",
"Using wrapper command: %s",
684 TProof::AddEnvVar(
"PROOF_WRAPPERCMD", wrapper);
690 fOptions.
Get(
"par").EqualTo(
"all",TString::kIgnoreCase));
696 opts.Append(Form(
"workers=%s",
fOptions.
Get(
"workers").Data()));
698 if (!
Connect(connect, opts))
return false;
703 Info(
"PreSetup",
"Clearing (%s)", pkgs.Data());
704 if (pkgs.IsNull() || pkgs.EqualTo(
"all", TString::kIgnoreCase)) {
706 if (gProof->ClearPackages() != 0)
707 Warning(
"ProofRailway::PreSetup",
"Failed to clear all packages");
708 Info(
"PreSetup",
"Clearing cache");
709 gProof->ClearCache(
"*");
716 while ((pkg = next())) {
717 if (
TString(pkg->GetName()).EqualTo(
"cache")) {
718 Info(
"PreSetup",
"Clearing cache");
719 gProof->ClearCache();
722 Info(
"PreSetup",
"Clearing package %s", pkg->GetName());
723 if (gProof->ClearPackage(pkg->GetName()) != 0)
724 Warning(
"ProofRailway::PreSetup",
"Failed to clear package %s",
743 if (prefix.IsNull()) {
744 Warning(
"EnableSpecial",
"No prefix specified");
747 const char* prf = prefix.Data();
748 if (parName.IsNull()) {
749 Warning(
"EnableSpecial",
"No par name specified for %s", prf);
754 params->SetOwner(
true);
758 params->Add(
new TNamed(Form(
"%s_EXTRA_LIBS", prf), tmp.Data()));
762 params->Add(
new TNamed(Form(
"%s_MODE", prf),
765 params->Add(
new TNamed(Form(
"%s_MODE", prf),
"default"));
769 params->Add(
new TNamed(Form(
"%s_ENABLE_ALIEN", prf),
"1"));
772 Int_t ret = gProof->EnablePackage(parName.Data(), params,
true);
774 Error(
"ProofRailway::EnableSpecial",
"Failed to enable %s PAR %s",
775 prf, parName.Data());
808 Error(
"ProofRailway::PostSetup",
"No analysis manager defined");
820 gProof->ClearFeedback();
830 TString name = TString::Format(
"%s_auxfiles", mgr->GetName());
833 if (gProof->UploadPackage(name.Data(), TProof::kRemoveOld) < 0)
834 Error(
"ProofRailway::PostSetup",
"Failed to upload PAR file %s",
845 while ((obj = next())) {
847 Int_t ret = gProof->EnablePackage(obj->GetName(),
true);
849 Error(
"ProofRailway::PostSetup",
"Failed to enable PAR %s",
864 while ((obj = next2())) {
865 Int_t ret = gProof->Load(Form(
"%s+g", obj->GetName()),
true);
867 Error(
"ProofRailway::PostSetup",
"Failed to compile %s",obj->GetName());
880 dsname =
fUrl.GetFile();
892 gProof->SetLogLevel(TMath::Max(
fVerbose-2,0),
899 TProofDebug::kPackage);
908 Warning(
"Run",
"Number of events %lld < offset (%lld), stopping",
912 Printf(
"Running proof analysis on data set %s", dsName.Data());
916 ret = mgr->StartAnalysis(
fUrl.GetProtocol(), chain,
nEvents, off);
918 ret = mgr->StartAnalysis(
fUrl.GetProtocol(), dsName.Data(),
nEvents, off);
921 if (!store.IsNull() && store.EqualTo(
"auto",TString::kIgnoreCase))
925 TProof::Mgr(
fUrl.GetUrl())->GetSessionLogs()->Save(
"*",
"proof.log");
931 if (path.IsNull())
return true;
934 Int_t nTokens = tokens->GetEntries();
936 Error(
"AddMonitor",
"Monitors must be of the form:\n" 937 " <task>[:<slot>]/<name>\n" 938 " <task>[:<slot>]/<path>/<name>");
945 TString& sTask =
static_cast<TObjString*
>(tokens->At(0))->String();
947 Ssiz_t colon = sTask.Index(
":");
948 if (colon != kNPOS) {
949 TString sSlot = sTask(colon+1, sTask.Length()-colon-1);
950 if (!sSlot.IsNull()) slotNo = sSlot.Atoi();
951 sTask.Remove(colon, sTask.Length()-colon);
956 Error(
"AddMonitor",
"Task \"%s\" not registered with manager",
960 AliAnalysisDataSlot* slot = task->GetOutputSlot(slotNo);
962 Error(
"AddMonitor",
"Task \"%s\" does not have an output slot at %d",
963 task->GetName(), slotNo);
966 AliAnalysisDataContainer* cont = slot->GetContainer();
968 Error(
"AddMonitor",
"Output slot %d of task \"%s\" has no container",
969 slotNo, task->GetName());
973 TString& first =
static_cast<TObjString*
>(tokens->At(idx))->String();
974 if (first.EqualTo(cont->GetName())) {
977 TObject* data = cont->GetData();
979 for (; idx < nTokens; idx++) {
992 std::cout << std::boolalpha
993 <<
" --- Other settings -------\n" 994 <<
" Extra libraries : " <<
fExtraLibs <<
"\n" 997 <<
" Use PARs of tasks: " <<
fUsePars <<
"\n" 999 << std::noboolalpha << std::endl;
1013 if (!name.BeginsWith(
"/")) {
1018 if (ret && name.EndsWith(
".root")) {
1019 TFile*
file = TFile::Open(name,
"READ");
1021 Info(
"AuxFile",
"Adding input file %s", name.Data());
1022 gProof->AddInputData(file,
true);
1030 Int_t bufSize = 32768;
1033 Long_t
id = 0, flags = 0, modtime = 0;
1034 if (
gSystem->GetPathInfo(fileName.Data(), &id, &size, &flags, &modtime)==1
1036 Error(
"SendFile",
"Cannot stat %s", fileName.Data());
1044 Int_t fd = open(fileName.Data(), O_RDONLY);
1045 while ((sl = static_cast<TSlave*>(next()))) {
1046 if (!sl->IsValid())
continue;
1047 if (sl->GetSlaveType() != TSlave::kSlave)
continue;
1050 snprintf(buf,bufSize,
"%s %d %lld %d", fn.Data(), 1, size, 0);
1051 if (sl->GetSocket()->Send(buf, kPROOF_SENDFILE) == -1) {
1052 Warning(
"SendFile",
"Could not send kPROOF_SENDFILE request");
1057 lseek(fd, 0, SEEK_SET);
1060 while ((len = read(fd, buf, bufSize)) < 0 &&
1061 TSystem::GetErrno() == EINTR)
1062 TSystem::ResetErrno();
1064 Error(
"SendFile",
"error reading input");
1068 if (len > 0 && sl->GetSocket()->SendRaw(buf, len) == -1) {
1069 Error(
"SendFile",
"error writing to slave");
1094 ret = Form(
"/%s/%s/", gProof->GetGroup(), gProof->GetUser());
1105 "proof://<host>[:<port>]/[<dataset>|<path>][?<options>][#<treeName>]";
1110 virtual const char*
Desc()
const {
return "PROOF"; }
1121 TFile* mgr = TFile::Open(Form(
"%s.root", escaped.Data()),
"RECREATE");
1122 AliAnalysisManager::GetAnalysisManager()->Write();
1128 TString macDir(
"$ALICE_PHYSICS/PWGLF/FORWARD/trains");
1129 std::ofstream t(
"Terminate.C");
1131 Error(
"ProofRailway::AuxSave",
"Failed to make terminate ROOT script");
1135 t <<
"// Generated by ProofRailway\n" 1136 <<
"Bool_t Terminate()\n" 1138 <<
" TString name = \"" << escaped <<
"\";\n" 1139 <<
" TString libs = \"" << libs <<
"\";\n" 1140 <<
" TString pars = \"" << pars <<
"\";\n" 1141 <<
" TString srcs = \"" << srcs <<
"\";\n\n" 1142 <<
" gSystem->Load(\"libANALYSIS\");\n" 1143 <<
" gSystem->Load(\"libANALYSISalice\");\n" 1144 <<
" gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");\n\n" 1145 <<
" gSystem->AddIncludePath(\"-I$ALICE_PHYSICS/include\");\n\n" 1146 <<
" gROOT->LoadMacro(\"" << macDir <<
"/ParUtilities.C+g\");\n\n" 1147 <<
" gROOT->LoadMacro(\"" << macDir <<
"/ProofTerminate.C+g\");\n\n" 1148 <<
" return ProofTerminate(name,libs,pars,srcs);\n"
virtual Bool_t LoadLibrary(const TString &name, Bool_t slaves=true, Bool_t forcePar=false)
ProofRailway & operator=(const ProofRailway &o)
virtual Bool_t CreateAliPhysicsPar()
virtual Bool_t CreatePseudoPar(const TString &parName, const TString &env, const TString &setup)
const TString & MakeLibraryName(const TString &name)
virtual Bool_t EnableAliPhysics()
virtual void Print(Option_t *="") const
Railway & operator=(const Railway &)
virtual const char * Desc() const
virtual Bool_t AuxFile(TString &name, bool copy=false)
static Bool_t StopXrootd()
Long64_t AsLong(const TString &name, Long64_t def=0) const
const TString & Get(const TString &name) const
virtual Bool_t PreSetup()
virtual const char * ModeString() const
virtual Bool_t Connect(const TUrl &url, const TString &opts)
virtual const char * AliPhysicsParName() const
void AuxSave(const TString &escaped, Bool_t)
Bool_t Has(const TString &name) const
virtual Bool_t CreateAliROOTPar()
static Bool_t Find(const TString &what)
ProofRailway(const ProofRailway &o)
virtual Bool_t EnableSpecial(const TString &parName, const TString &prefix)
virtual void GetDataSet(TString &dsname)
virtual Bool_t AddIncludePath(const TString &path)
Double_t nEvents
plot quality messages
virtual Bool_t LoadSource(const TString &name, bool copy=true)
virtual Bool_t AuxFile(TString &name, bool copy=false)
virtual void Print(Option_t *option="") const
virtual Long64_t Run(Long64_t nEvents=-1)
virtual Bool_t PostSetup()
virtual Bool_t AddMonitor(const TString &)
virtual Bool_t LoadAliROOT()
virtual UShort_t Mode() const
virtual Bool_t AddIncludePath(const TString &path)
virtual Bool_t EnableAliROOT()
Option * Add(const TString &name, const TString &arg, const TString &desc, const TString &val="")
virtual const Char_t * UrlHelp() const
Int_t SendFile(const TString &fileName)
virtual Bool_t LoadExtraSrcs()
TFile * file
TList with histograms for a given trigger.
Base class for analysis helpers.
virtual const char * AliROOTParName() const
virtual Bool_t LoadAliPhysics()
static TString RegisteredDataset()
virtual TString OutputPath() const
static Bool_t Build(const TString &what)
static Bool_t RegisterDataset(const TString &dsname)
static Bool_t MakeAuxFilePAR(const TList &files, const TString &name, Bool_t verbose=false)
static void ExportEnvVar(TString &out, const TString &name)
ProofRailway(const TUrl &url, Int_t verbose)
virtual Bool_t LoadSource(const TString &name, bool copy=false)
static Bool_t RegisterStorage(const TString &url)