1 /*
2 Copyright (C) 2000 - 2007 Grid Systems, S.A.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2, as
6 published by the Free Software Foundation.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 */
17
18 package com.gridsystems.innergrid.kernel.genericutils;
19
20 import java.lang.reflect.Array;
21 import java.util.ArrayList;
22
23 import com.gridsystems.exceptions.PagedRequestNoDataException;
24 import com.gridsystems.innergrid.kernel.server.KernelContext;
25
26 /**
27 * Base class for asynchronous paged requests.
28 * <p>
29 * <b>Subclassing</b>
30 * <p>
31 * Subclasses must implement the {@link #run()} method, and do it fill the
32 * {@link #results} list.
33 * <p>
34 * The implementation should correctly (and regularly) check for interruption condition,
35 * as it is the mechanism used by PagedRequestManager to cancel request executions.
36 * <p>
37 * <b>Usage</b>
38 * <p>
39 * A typical iteration loop over a paged request that gets the results, concurrently to
40 * the request execution, will have the following layout:
41 * <p>
42 * <code>
43 * PageRequestManager<MyBean> myManager = new PageRequestManager<MyBean>();
44 * ...
45 * PagedRequest<MyBean> req = myManager.getPagedRequest(id);<br>
46 * <br>
47 * <b>while</b> (<b>true</b>) {<br>
48 * try {<br>
49 * MyBean items[] = req.getResults(results.size(), N);<br>
50 * <b>if</b> (items == <b>null</b>) <b>break</b>; // No more items<br>
51 * <i>// do something whith items</i><br>
52 * }<br>
53 * <b>catch</b> (PagedRequestTimeoutException e) {<br>
54 * <i>// do nothing</i><br>
55 * }<br>
56 * <b>catch</b> (PagedRequestNoDataException e) {<br>
57 * <i>// sleep some time</i><br>
58 * }<br>
59 * }
60 * </code>
61 * <p>
62 * Or, if we just want to wait until the request execution is finished, and get all its
63 * results in one operation, we can do:
64 * <p>
65 * <code>
66 * PagedRequest req = ...;<br>
67 * req.join();<br>
68 * <br>
69 * <b>int</b> count = req.getResultsCount();<br>
70 * Object results[] = (count == 0) ? <b>new</b> Object[0] : req.getResults(0, count);
71 * </code>
72 *
73 * @author lgaspart
74 * @author rruiz
75 * @author Xmas (generics)
76 * @version 1.0
77 * @param <E> Object type to be paged.
78 */
79 public abstract class PagedRequest<E extends Object> extends Thread {
80 /**
81 * Amount of time to sleep between checking if the request has finished.
82 */
83 private static final long INTERRUPT_SLEEP_DELTA = 1000;
84
85 /**
86 * Maximum amount of time to sleep when checking if the request has finished.
87 */
88 private static final long INTERRUPT_SLEEP = 1000;
89
90 /**
91 * Request sleep time during results retrieval.
92 */
93 private static final int SLEEP_TIME = 500;
94
95 /**
96 * Maximum allowed sleep time during results retrieval.
97 */
98 private static final int MAX_SLEEP_TIME = 60000;
99
100 /**
101 * Token used for synchronization.
102 */
103 private static final Object SYNC_TOKEN = new Object();
104
105 /**
106 * Request counter.
107 */
108 private static long counter = 0;
109
110 /**
111 * Request identifier.
112 */
113 private long id = 0;
114
115 /**
116 * Request user information.
117 */
118 protected KernelContext originalContext = null;
119
120 /**
121 * Request time.
122 */
123 private long requestTime = 0;
124
125 /**
126 * The time of the last data access.
127 */
128 private long lastAccessTime = 0;
129
130 /**
131 * Must block execution on request.
132 */
133 private boolean block = false;
134
135 /**
136 * Results of the request.
137 */
138 protected ArrayList<E> results = new ArrayList<E>();
139
140 /**
141 * Class of type E.
142 */
143 protected final Class<E> theClass;
144
145 /**
146 * Creates a new instance.
147 *
148 * @param theClass Class of type E
149 * @param context The request context
150 * @param blockRequest true if the request must be blocking
151 */
152 public PagedRequest(KernelContext context, Class<E> theClass, boolean blockRequest) {
153 super();
154 this.setDaemon(true);
155
156 synchronized (PagedRequest.SYNC_TOKEN) {
157 this.id = PagedRequest.counter++;
158 }
159 this.theClass = theClass;
160 this.requestTime = System.currentTimeMillis();
161 this.lastAccessTime = System.currentTimeMillis();
162 this.originalContext = (KernelContext)context.clone();
163 this.block = blockRequest;
164 }
165
166 /**
167 * Subclasses must implement the request in this method.
168 */
169 public abstract void run();
170
171
172 /**
173 * Gets the identifier of the paged request.
174 *
175 * @return this paged request identifier
176 */
177 public long getId() {
178 return this.id;
179 }
180
181 /**
182 * Gets the user of the paged request.
183 *
184 * @return the user of this paged request
185 */
186 public String getUser() {
187 return originalContext.getUserName();
188 }
189
190 /**
191 * Gets the hour of the paged request.
192 *
193 * @return the hour of this paged request
194 */
195 public long getTime() {
196 return requestTime;
197 }
198
199 /**
200 * Gets the number of results of the paged request.
201 *
202 * @return the number of results in this request
203 */
204 public int getResultsCount() {
205 return (this.results == null) ? 0 : this.results.size();
206 }
207
208 /**
209 * Gets the time of the last access to this request data.
210 *
211 * @return the last access time
212 */
213 public long getLastAccessTime() {
214 return lastAccessTime;
215 }
216
217 /**
218 * Blocks until the request execution has finished, there are at least from+count
219 * results in the pool, the current thread is interrupted or the specified timeout
220 * has been reached.
221 *
222 * @param from The offset of the first result to get
223 * @param count The number of results to get
224 * @param timeout The maximum wait time interval. Use -1 for no timeout
225 * @throws PagedRequestNoDataException If the timeout is reached and no results are
226 * available, or the thread is interrupted.
227 */
228 private void waitForResults(int from, int count, long timeout)
229 throws PagedRequestNoDataException {
230
231 long ts1 = System.currentTimeMillis();
232 this.lastAccessTime = ts1;
233
234 int size = from + count;
235 while (isAlive() && size > this.results.size()) {
236
237 // Controls timeout condition
238 if (timeout != -1 && lastAccessTime - ts1 >= timeout) {
239 // Only throw timeout if there are no results at all
240 if (this.results.size() <= from) {
241 throw new PagedRequestNoDataException();
242 } else {
243 break;
244 }
245 }
246
247 // Waits for more results to be generated
248 try {
249 sleep(SLEEP_TIME);
250 } catch (InterruptedException e) {
251 throw new PagedRequestNoDataException();
252 }
253
254 // Updates last access time, as there is somebody waiting
255 this.lastAccessTime = System.currentTimeMillis();
256 }
257 }
258
259 /**
260 * Asks for 'count' results, starting at offset 'from'. If not enough data is
261 * initially available, it will block during at most 'timeout' milliseconds.
262 * <p>
263 * The returned value depends on the following rules:
264 *
265 * <ul>
266 * <li>If, after the timeout interval, there are no available results, but the
267 * request execution is still in progress, a
268 * {@link com.gridsystems.exceptions.PagedRequestNoDataException}
269 * will be thrown.
270 * <li>The same exception will be thrown if the current thread is interrupted.
271 * <li>If, after the timeout interval, there are one or more results,
272 * they will be returned in an array, even if there are less items than
273 * requested.
274 * <li>If the request execution has finished, and there are no results in the
275 * specified range, <code>null</code> will be returned. This marks the
276 * end condition for a retrieval loop.
277 * </ul>
278 *
279 * @param from The offset of the first result to get
280 * @param count The number of results to get
281 * @param timeout The max blocking time
282 * @return An array with the obtained results, or null if the request is finished
283 * and 'from' is out of the results bounds.
284 * @throws PagedRequestNoDataException In case of timeout
285 */
286 @SuppressWarnings("unchecked")
287 public E[] getResultsTimeout(int from, int count, long timeout)
288 throws PagedRequestNoDataException {
289 this.lastAccessTime = System.currentTimeMillis();
290 int requestedSize = from + count;
291
292 // Blocks until enough data available (or timeout)
293 waitForResults(from, count, timeout);
294
295 // Thread finished, timed out, or results available
296 int resultsSize = this.results.size();
297
298 if (from >= resultsSize) {
299 // We can only be here if the request execution has finished
300 return null;
301 } else {
302 // adjusts item count to the available data
303 if (requestedSize > resultsSize) {
304 count = resultsSize - from;
305 }
306
307 // Returns results into an array
308 ArrayList<E> array = new ArrayList<E>(count);
309 for (int i = 0; i < count; i++) {
310 array.add(this.results.get(from + i));
311 }
312 E[] newArray = (E[])Array.newInstance(theClass, count);
313 return array.toArray(newArray);
314 }
315 }
316
317 /**
318 * Calls getResultsTimeout() with an appropriate timeout value depending on
319 * the 'block' flag specified at construction time.
320 *
321 * @param from The initial offset
322 * @param count The number of results to retrieve
323 * @return An array with the results (either all, or a partial subset), or null
324 * if the request execution has finished and 'from' is out of bounds
325 * @throws PagedRequestNoDataException if no results are available yet
326 */
327 public E[] getResults(int from, int count) throws PagedRequestNoDataException {
328 return getResultsTimeout(from, count, block ? MAX_SLEEP_TIME : 0L);
329 }
330
331 /**
332 * Clears the Paged request.
333 */
334 public void clear() {
335 // Interrupt the thread if it is not yet
336 if (!this.isInterrupted()) {
337 this.interrupt();
338 }
339
340 // Wait for normal finalization
341 long start = System.currentTimeMillis();
342 long elapsed = 0;
343 while (this.isAlive() && elapsed < INTERRUPT_SLEEP) {
344 try {
345 Thread.sleep(INTERRUPT_SLEEP_DELTA);
346 } catch (InterruptedException e) {
347 break;
348 }
349 elapsed = System.currentTimeMillis() - start;
350 }
351 }
352 }