-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathExecutorService.cfc
More file actions
executable file
·107 lines (86 loc) · 3.77 KB
/
ExecutorService.cfc
File metadata and controls
executable file
·107 lines (86 loc) · 3.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
component extends="AbstractExecutorService" accessors="true" output="false"{
//expose all the guts... power users need this stuff, and they shall have it
property name="workQueue";
property name="workExecutor";
/**
* @serviceName The unique application name for this Completion service
@maxConcurrent The maximum number of tasks which will be run at one time. A value of 0 will cause the maxConcurrent to be calculated as Number of CPUs + 1
@maxWorkQueueSize
*/
public function init( serviceName, numeric maxConcurrent=0, numeric maxWorkQueueSize=10000, objectFactory="#createObject('component', 'ObjectFactory').init()#" ){
super.init( serviceName, objectFactory );
structAppend( variables, arguments );
if( maxConcurrent LTE 0 ){
variables.maxConcurrent = getProcessorCount() + 1;
}
return this;
}
public function start(){
variables.workQueue = objectFactory.createQueue( maxWorkQueueSize );
//TODO: extract this policy and make it settable
variables.workExecutor = objectFactory.createThreadPoolExecutor( maxConcurrent, workQueue, "DiscardPolicy" );
setSubmissionTarget( workExecutor );
//store the executor for sane destructability
storeExecutor( "workExecutor", variables.workExecutor );
return super.start();
}
/**
* Executes the tasks, returning an array of Futures when all complete.
* If the service is not running, tasks are ignored.
* @tasks An array of task instances. A task CFC must expose a call() method that returns a result
* @timeout Maximum time to wait. 0 indicates to wait until completion
* @timeUnit TimeUnit of the timeout argument. Defaults to TimeUnit.SECONDS.
*/
public function invokeAll( array tasks, timeout=0, timeUnit="#objectFactory.SECONDS#" ){
var results = [];
var proxies = [];
if( isStarted() ){
for( var task in tasks ){
arrayAppend( proxies, objectFactory.createSubmittableProxy( task ) );
}
if( timeout LTE 0 ){
return getSubmissionTarget().invokeAll( proxies );
} else {
return getSubmissionTarget().invokeAll( proxies, timeout, timeUnit );
}
} else if( isPaused() ) {
writeLog("Service paused... ignoring submission");
} else if( isStopped() ){
throw("Service is stopped... not accepting new tasks");
}
}
/**
* Executes the tasks, returning the result of one that has completed successfully, if any do. This result will be the returned value from the task's call() method
* If the service is not running, tasks are ignored.
* @tasks An array of task instances. A task CFC must expose a call() method that returns a result
* @timeout Maximum time to wait. 0 indicates to wait until completion
* @timeUnit TimeUnit of the timeout argument. Defaults to TimeUnit.SECONDS
*/
public function invokeAny( array tasks, timeout=0, timeUnit="#objectFactory.SECONDS#" ){
var results = [];
var proxies = [];
if( isStarted() ){
for( var task in tasks ){
arrayAppend( proxies, objectFactory.createSubmittableProxy( task ) );
}
if( timeout LTE 0 ){
return getSubmissionTarget().invokeAny( proxies );
} else {
return getSubmissionTarget().invokeAny( proxies, timeout, timeUnit );
}
} else if( isPaused() ) {
writeLog("Service paused... ignoring submission");
} else if( isStopped() ){
throw("Service is stopped... not accepting new tasks");
}
}
/**
* Straight from the javadoc: Executes the given command at some time in the future. The command may execute in a new thread, in a pooled thread, or in the calling thread, at the discretion of the Executor implementation.
* This is the equivalent of fire-and-forget usage of cfthread
* @runnableTask A task instance that exposes a void run() method
*/
public function execute( runnableTask ){
var proxy = objectFactory.createRunnableProxy( runnableTask );
getSubmissionTarget().execute( proxy );
}
}