001
014
015 package com.liferay.portal.kernel.process;
016
017 import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
018 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
019 import com.liferay.portal.kernel.log.Log;
020 import com.liferay.portal.kernel.log.LogFactoryUtil;
021 import com.liferay.portal.kernel.process.log.ProcessOutputStream;
022 import com.liferay.portal.kernel.util.NamedThreadFactory;
023 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
024
025 import java.io.EOFException;
026 import java.io.IOException;
027 import java.io.InputStream;
028 import java.io.ObjectInputStream;
029 import java.io.ObjectOutputStream;
030 import java.io.OutputStream;
031 import java.io.PrintStream;
032 import java.io.Serializable;
033 import java.io.StreamCorruptedException;
034
035 import java.util.concurrent.Callable;
036 import java.util.concurrent.ExecutorService;
037 import java.util.concurrent.Executors;
038 import java.util.concurrent.Future;
039
040
043 public class ProcessExecutor {
044
045 public static <T extends Serializable> T execute(
046 ProcessCallable<T> processCallable, String classPath)
047 throws ProcessException {
048
049 try {
050 ProcessBuilder processBuilder = new ProcessBuilder(
051 "java", "-cp", classPath, ProcessExecutor.class.getName());
052
053 Process process = processBuilder.start();
054
055 _writeObject(process.getOutputStream(), processCallable);
056
057 ExecutorService executorService = _getExecutorService();
058
059 SubprocessReactor subprocessReactor = new SubprocessReactor(
060 process.getInputStream());
061
062 Future<ProcessCallable<?>> futureResponseProcessCallable =
063 executorService.submit(subprocessReactor);
064
065 int exitCode = process.waitFor();
066
067 if (exitCode != 0) {
068 throw new ProcessException(
069 "Subprocess terminated with exit code " + exitCode);
070 }
071
072 ProcessCallable<?> responseProcessCallable =
073 futureResponseProcessCallable.get();
074
075 if (responseProcessCallable instanceof ReturnProcessCallable<?>) {
076 return (T)responseProcessCallable.call();
077 }
078
079 if (responseProcessCallable instanceof ExceptionProcessCallable) {
080 ExceptionProcessCallable exceptionProcessCallable =
081 (ExceptionProcessCallable)responseProcessCallable;
082
083 throw exceptionProcessCallable.call();
084 }
085
086 if (_log.isWarnEnabled()) {
087 _log.warn(
088 "Subprocess reactor exited without a valid return " +
089 "because the subprocess terminated with an exception");
090 }
091
092 return null;
093 }
094 catch (ProcessException pe) {
095 throw pe;
096 }
097 catch (Exception e) {
098 throw new ProcessException(e);
099 }
100 }
101
102 public static void main(String[] arguments)
103 throws ClassNotFoundException, IOException {
104
105 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
106 System.out);
107
108 ProcessOutputStream outProcessOutputStream = new ProcessOutputStream(
109 objectOutputStream, false);
110
111 PrintStream outPrintStream = new PrintStream(
112 outProcessOutputStream, true);
113
114 System.setOut(outPrintStream);
115
116 ProcessOutputStream errProcessOutputStream = new ProcessOutputStream(
117 objectOutputStream, true);
118
119 PrintStream errPrintStream = new PrintStream(
120 errProcessOutputStream, true);
121
122 System.setErr(errPrintStream);
123
124 try {
125 ProcessCallable<?> processCallable =
126 (ProcessCallable<?>)_readObject(System.in, false);
127
128 Serializable result = processCallable.call();
129
130 outPrintStream.flush();
131
132 outProcessOutputStream.writeProcessCallable(
133 new ReturnProcessCallable<Serializable>(result));
134
135 outProcessOutputStream.close();
136 }
137 catch (ProcessException pe) {
138 errPrintStream.flush();
139
140 errProcessOutputStream.writeProcessCallable(
141 new ExceptionProcessCallable(pe));
142
143 errProcessOutputStream.close();
144 }
145 }
146
147 public void destroy() {
148 if (_executorService == null) {
149 return;
150 }
151
152 synchronized (ProcessExecutor.class) {
153 if (_executorService != null) {
154 _executorService.shutdownNow();
155
156 _executorService = null;
157 }
158 }
159 }
160
161 private static ExecutorService _getExecutorService() {
162 if (_executorService != null) {
163 return _executorService;
164 }
165
166 synchronized (ProcessExecutor.class) {
167 if (_executorService == null) {
168 _executorService = Executors.newCachedThreadPool(
169 new NamedThreadFactory(
170 ProcessExecutor.class.getName(), Thread.MIN_PRIORITY,
171 PortalClassLoaderUtil.getClassLoader()));
172 }
173 }
174
175 return _executorService;
176 }
177
178 private static Object _readObject(InputStream inputStream, boolean close)
179 throws ClassNotFoundException, IOException {
180
181 ObjectInputStream objectInputStream = new ObjectInputStream(
182 inputStream);
183
184 try {
185 return objectInputStream.readObject();
186 }
187 finally {
188 if (close) {
189 objectInputStream.close();
190 }
191 }
192 }
193
194 private static void _writeObject(OutputStream outputStream, Object object)
195 throws IOException {
196
197 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
198 outputStream);
199
200 try {
201 objectOutputStream.writeObject(object);
202 }
203 finally {
204 objectOutputStream.close();
205 }
206 }
207
208 private static Log _log = LogFactoryUtil.getLog(ProcessExecutor.class);
209
210 private static volatile ExecutorService _executorService;
211
212 private static class SubprocessReactor
213 implements Callable<ProcessCallable<? extends Serializable>> {
214
215 public SubprocessReactor(InputStream inputStream) {
216 _unsyncBufferedInputStream = new UnsyncBufferedInputStream(
217 inputStream);
218 }
219
220 public ProcessCallable<? extends Serializable> call() throws Exception {
221 try {
222 ObjectInputStream objectInputStream = null;
223
224 UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
225 new UnsyncByteArrayOutputStream();
226
227 while (true) {
228 try {
229
230
231
232 _unsyncBufferedInputStream.mark(4);
233
234 objectInputStream =
235 new PortalClassLoaderObjectInputStream(
236 _unsyncBufferedInputStream);
237
238
239
240
241 if (unsyncByteArrayOutputStream.size() > 0) {
242 if (_log.isWarnEnabled()) {
243 _log.warn(
244 "Found corrupted leading log: " +
245 unsyncByteArrayOutputStream.toString());
246 }
247 }
248
249 unsyncByteArrayOutputStream = null;
250
251 break;
252 }
253 catch (StreamCorruptedException sce) {
254
255
256
257 _unsyncBufferedInputStream.reset();
258
259 unsyncByteArrayOutputStream.write(
260 _unsyncBufferedInputStream.read());
261 }
262 }
263
264 while (true) {
265 ProcessCallable<?> processCallable =
266 (ProcessCallable<?>)objectInputStream.readObject();
267
268 if (processCallable instanceof ExceptionProcessCallable) {
269 return processCallable;
270 }
271
272 if (processCallable instanceof ReturnProcessCallable<?>) {
273 return processCallable;
274 }
275
276 Serializable result = processCallable.call();
277
278 if (_log.isDebugEnabled()) {
279 _log.debug(
280 "Invoked generic process callable " +
281 processCallable + " with return value " +
282 result);
283 }
284 }
285 }
286 catch (EOFException eofe) {
287 }
288
289 return null;
290 }
291
292 private final UnsyncBufferedInputStream _unsyncBufferedInputStream;
293
294 }
295
296 }