@@ -32,6 +32,7 @@ import (
3232	"github.com/elastic/elastic-package/internal/multierror" 
3333	"github.com/elastic/elastic-package/internal/packages" 
3434	"github.com/elastic/elastic-package/internal/servicedeployer" 
35+ 	"github.com/elastic/elastic-package/internal/testrunner/runners/system" 
3536	"github.com/elastic/elastic-package/internal/wait" 
3637)
3738
@@ -169,14 +170,6 @@ func (r *runner) setUp(ctx context.Context) error {
169170		return  fmt .Errorf ("reading package manifest failed: %w" , err )
170171	}
171172
172- 	policy , err  :=  r .createBenchmarkPolicy (ctx , pkgManifest )
173- 	if  err  !=  nil  {
174- 		return  err 
175- 	}
176- 	r .benchPolicy  =  policy 
177- 
178- 	// Delete old data 
179- 	logger .Debug ("deleting old data in data stream..." )
180173	dataStreamManifest , err  :=  packages .ReadDataStreamManifest (
181174		filepath .Join (
182175			common .DataStreamPath (r .options .PackageRootPath , r .scenario .DataStream .Name ),
@@ -187,6 +180,12 @@ func (r *runner) setUp(ctx context.Context) error {
187180		return  fmt .Errorf ("reading data stream manifest failed: %w" , err )
188181	}
189182
183+ 	policy , err  :=  r .createBenchmarkPolicy (ctx , pkgManifest , dataStreamManifest )
184+ 	if  err  !=  nil  {
185+ 		return  err 
186+ 	}
187+ 	r .benchPolicy  =  policy 
188+ 
190189	r .runtimeDataStream  =  fmt .Sprintf (
191190		"%s-%s.%s-%s" ,
192191		dataStreamManifest .Type ,
@@ -210,6 +209,7 @@ func (r *runner) setUp(ctx context.Context) error {
210209		return  nil 
211210	}
212211
212+ 	logger .Debug ("deleting old data in data stream..." )
213213	if  err  :=  r .deleteDataStreamDocs (ctx , r .runtimeDataStream ); err  !=  nil  {
214214		return  fmt .Errorf ("error deleting old data in data stream: %s: %w" , r .runtimeDataStream , err )
215215	}
@@ -234,6 +234,7 @@ func (r *runner) run(ctx context.Context) (report reporters.Reportable, err erro
234234		s , err  :=  r .setupService (ctx )
235235		if  errors .Is (err , os .ErrNotExist ) {
236236			logger .Debugf ("No service deployer defined for this benchmark" )
237+ 			logger .Debugf ("The error is: %s" , err .Error ())
237238		} else  if  err  !=  nil  {
238239			return  nil , err 
239240		}
@@ -292,6 +293,7 @@ func (r *runner) setupService(ctx context.Context) (servicedeployer.DeployedServ
292293	// Setup service. 
293294	logger .Debug ("Setting up service..." )
294295	devDeployDir  :=  filepath .Clean (filepath .Join (r .options .BenchPath , "deploy" ))
296+ 	logger .Debugf ("Using service deployer dev directory: %s" , devDeployDir )
295297	opts  :=  servicedeployer.FactoryOptions {
296298		PackageRootPath :        r .options .PackageRootPath ,
297299		DevDeployDir :           devDeployDir ,
@@ -367,7 +369,7 @@ func (r *runner) deleteDataStreamDocs(ctx context.Context, dataStream string) er
367369	return  nil 
368370}
369371
370- func  (r  * runner ) createBenchmarkPolicy (ctx  context.Context , pkgManifest  * packages.PackageManifest ) (* kibana.Policy , error ) {
372+ func  (r  * runner ) createBenchmarkPolicy (ctx  context.Context , pkgManifest  * packages.PackageManifest ,  dataStreamManifest   * packages. DataStreamManifest ) (* kibana.Policy , error ) {
371373	// Configure package (single data stream) via Ingest Manager APIs. 
372374	logger .Debug ("creating benchmark policy..." )
373375	benchTime  :=  time .Now ().Format ("20060102T15:04:05Z" )
@@ -385,21 +387,53 @@ func (r *runner) createBenchmarkPolicy(ctx context.Context, pkgManifest *package
385387
386388	policy , err  :=  r .options .KibanaClient .CreatePolicy (ctx , p )
387389	if  err  !=  nil  {
388- 		return  nil , err 
390+ 		return  nil , fmt . Errorf ( "failed to create benchmark policy: %w" ,  err ) 
389391	}
390392
391- 	packagePolicy , err  :=  r .createPackagePolicy (ctx , pkgManifest , policy )
393+ 	if  r .scenario .PolicyTemplate  ==  ""  {
394+ 		policyTemplateName , err  :=  system .FindPolicyTemplateForInput (pkgManifest , dataStreamManifest , r .scenario .Input )
395+ 		if  err  !=  nil  {
396+ 			return  nil , fmt .Errorf ("failed to determine the associated policy_template: %w" , err )
397+ 		}
398+ 		r .scenario .PolicyTemplate  =  policyTemplateName 
399+ 	}
400+ 	policyTemplate , err  :=  system .SelectPolicyTemplateByName (pkgManifest .PolicyTemplates , r .scenario .PolicyTemplate )
392401	if  err  !=  nil  {
393- 		return  nil , err 
402+ 		return  nil , fmt .Errorf ("failed to find the selected policy_template: %w" , err )
403+ 	}
404+ 
405+ 	logger .Debug ("adding package data stream to test policy..." )
406+ 	ds , err  :=  system .CreatePackageDatastream (
407+ 		policy ,
408+ 		pkgManifest ,
409+ 		policyTemplate ,
410+ 		dataStreamManifest ,
411+ 		r .scenario .Input ,
412+ 		r .scenario .Vars ,
413+ 		r .scenario .DataStream .Vars ,
414+ 		policy .Namespace )
415+ 	if  err  !=  nil  {
416+ 		return  nil , fmt .Errorf ("could not create package data stream: %w" , err )
417+ 	}
418+ 
419+ 	if  err  :=  r .options .KibanaClient .AddPackageDataStreamToPolicy (ctx , ds ); err  !=  nil  {
420+ 		return  nil , fmt .Errorf ("could not add data stream config to policy: %w" , err )
394421	}
422+ 	logger .Debugf ("added data stream %s to policy" , ds .Name )
423+ 
424+ 	// packagePolicy, err := r.createPackagePolicy(ctx, pkgManifest, policy) 
425+ 	// if err != nil { 
426+ 	// 	return nil, fmt.Errorf("failed to create package policy: %w", err) 
427+ 	// } 
428+ 	// logger.Debugf("created package policy with id: %s", packagePolicy.ID) 
395429
396430	r .deletePolicyHandler  =  func (ctx  context.Context ) error  {
397431		var  merr  multierror.Error 
398432
399- 		logger .Debug ("deleting benchmark package policy..." )
400- 		if  err  :=  r .options .KibanaClient .DeletePackagePolicy (ctx , * packagePolicy ); err  !=  nil  {
401- 			merr  =  append (merr , fmt .Errorf ("error cleaning up benchmark package policy: %w" , err ))
402- 		}
433+ 		//  logger.Debug("deleting benchmark package policy...")
434+ 		//  if err := r.options.KibanaClient.DeletePackagePolicy(ctx, *packagePolicy); err != nil {
435+ 		//  	merr = append(merr, fmt.Errorf("error cleaning up benchmark package policy: %w", err))
436+ 		//  }
403437
404438		logger .Debug ("deleting benchmark policy..." )
405439		if  err  :=  r .options .KibanaClient .DeletePolicy (ctx , policy .ID ); err  !=  nil  {
0 commit comments