-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathAbstractExecutorService.cfc
More file actions
executable file
·179 lines (145 loc) · 5.05 KB
/
AbstractExecutorService.cfc
File metadata and controls
executable file
·179 lines (145 loc) · 5.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
component output="false" accessors="true"{
property name="serviceName" type="string";
property name="status" type="string";
property name="objectFactory";
property name="maxConcurrent" type="numeric";
property name="loggingEnabled" type="boolean";
property name="logName" type="string";
property name="submissionTarget";
/*
storage scope is a server-scoped bucket into which we put "things that can shut down";
we need this as a safeguard against developers who don't heed instructions to *ensure* that
stop() is called onApplicationStop().
Any executors we create will live in this scope, and on initialization, any previously created
executors will be shutdown immediately and then removed from scope
*/
property name="thisStorageScope" type="struct" setter="false";
variables.logName = "cfconcurrent";
variables.loggingEnabled = false;
variables.baseStorageScopeName = "__cfconcurrent";
variables.status = "stopped";
/*WARNING: you may be tempted to call stop() either on init or on start. Do Not Do So.
These services might use other services, and calling stop() would have the effect of undoing
work that you *just* did -- any executors you create prior to calling start() on another
service would cancel those executors.
It is the responsibility of the user of this library to call stop() on services he/she starts
*/
public function init( String serviceName, objectFactory="#createObject('component', 'ObjectFactory').init()#" ){
structAppend( variables, arguments );
return this;
}
/* Lifecycle methods*/
public function start(){
//This will be overridden by implementers
status = "started";
return this;
}
public function stop( timeout=100, timeUnit="#objectFactory.MILLISECONDS#" ){
shutdownAllExecutors( timeout, timeUnit );
status = "stopped";
return this;
}
public function pause(){
status = "paused";
return this;
}
public function unPause(){
if( isStopped() ){
start();
} else {
status = "started";
}
return this;
}
public function isStarted(){
return getStatus() eq "started";
}
public function isStopped(){
return getStatus() eq "stopped";
}
public function isPaused(){
return getStatus() eq "paused";
}
/* Execution methods */
/**
* Submits an object for execution. Returns a Future if Callable and a RunnableFuture if Runnable
* If the service is not running, tasks are ignored
* @task A task instance. Object must expose either a call() or run() method.
*/
public function submit( task ){
if( isStarted() ){
var proxy = objectFactory.createSubmittableProxy( task );
return getSubmissionTarget().submit( proxy );
} else if( isPaused() ) {
writeLog("Service paused... ignoring submission");
} else if( isStopped() ){
throw("Service is stopped... not accepting new tasks");
}
}
/* Storage methods for shutdown management */
/**
* application["__cfoncurrent"]
*/
public function getBaseStorageScope(){
if( NOT structKeyExists( application, variables.baseStorageScopeName ) ){
application[variables.baseStorageScopeName] = {};
}
return application[variables.baseStorageScopeName];
}
/**
* application["__cfconcurrent"][application.applicationname]
*/
public function getApplicationStorageScope(){
var scope = getBaseStorageScope();
if( NOT structKeyExists( scope, application.applicationname ) ){
scope[application.applicationname] = {};
}
return scope;
}
/**
* application["__cfconcurrent"][application.applicationname][ getserviceName() ]
serviceName does NOT have to be application.applicationname... it should be a name that *uniquely* identifies
This concurrency service in the application.
If the application has multiple concurrency services, they must be uniquely named
*/
public function getThisStorageScope(){
var scope = getApplicationStorageScope();
if( NOT structKeyExists( scope, serviceName) ){
scope[serviceName] = {};
}
return scope[serviceName];
}
public function logMessage( String text, String file="#logName#" ){
if( getLoggingEnabled() ){
writeLog( text=arguments.text, file=arguments.file );
}
return this;
}
public function getProcessorCount(){
return objectFactory.getProcessorCount();
}
package function storeExecutor( string name, any executor ){
var scope = getThisStorageScope();
if( structKeyExists( scope, arguments.name ) ){
logMessage( "Executor named #arguments.name# already existed. Shutting it down and replacing it" );
scope[ arguments.name ].shutdownNow();
}
scope[ arguments.name ] = arguments.executor;
return this;
}
package function shutdownAllExecutors( timeout=100, timeUnit="#objectFactory.MILLISECONDS#" ){
if( NOT structIsEmpty( getThisStorageScope() ) ){
var scope = getThisStorageScope();
for( var executor in scope ){
writeLog("Waiting #timeout# #timeUnit# for tasks to complete and then shutting down executor named #executor#");
scope[executor].shutDown();
var stopped = scope[executor].awaitTermination( timeout, timeUnit );
if(NOT stopped){
scope[executor].shutdownNow();
}
}
structClear( scope );
}
return this;
}
}