/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.rdf4j.federated.evaluation.concurrent;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.federated.evaluation.concurrent.NamingThreadFactory;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTask;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTaskBase;
import org.eclipse.rdf4j.federated.evaluation.concurrent.Scheduler;
import org.eclipse.rdf4j.federated.evaluation.concurrent.TaskWrapper;
import org.eclipse.rdf4j.federated.evaluation.concurrent.TaskWrapperAware;
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
import org.eclipse.rdf4j.federated.exception.FedXRuntimeException;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ControlledWorkerScheduler<T>
implements Scheduler<T>,
TaskWrapperAware {
    private static final Logger log = LoggerFactory.getLogger(ControlledWorkerScheduler.class);
    private final ExecutorService executor;
    private final LinkedBlockingQueue<Runnable> _taskQueue = new LinkedBlockingQueue();
    private final int nWorkers;
    private final String name;
    private TaskWrapper taskWrapper;

    @Deprecated
    public ControlledWorkerScheduler() {
        this(20, "FedX Worker");
    }

    public ControlledWorkerScheduler(int nWorkers, String name) {
        this.nWorkers = nWorkers;
        this.name = name;
        this.executor = this.createExecutorService();
    }

    @Override
    public void schedule(ParallelTask<T> task) {
        assert (!task.getControl().isFinished());
        Runnable runnable = new WorkerRunnable(task);
        if (this.taskWrapper != null) {
            runnable = this.taskWrapper.wrap(runnable);
        }
        try {
            task.getQueryInfo().registerScheduledTask(task);
        }
        catch (Throwable e) {
            task.cancel();
            throw e;
        }
        Future<?> future = this.executor.submit(runnable);
        if (task instanceof ParallelTaskBase) {
            ((ParallelTaskBase)task).setScheduledFuture(future);
        }
    }

    public void scheduleAll(List<ParallelTask<T>> tasks, ParallelExecutor<T> control) {
        for (ParallelTask<T> task : tasks) {
            this.schedule(task);
        }
    }

    public int getTotalNumberOfWorkers() {
        return this.nWorkers;
    }

    public int getNumberOfTasks() {
        return this._taskQueue.size();
    }

    private ExecutorService createExecutorService() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(this.nWorkers, this.nWorkers, 60L, TimeUnit.SECONDS, this._taskQueue, new NamingThreadFactory(this.name));
        executor.allowCoreThreadTimeOut(true);
        return executor;
    }

    @Override
    public void abort() {
        if (!this.executor.isTerminated()) {
            log.info("Aborting workers of " + this.name + ".");
            this.executor.shutdownNow();
            try {
                this.executor.awaitTermination(30L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new FedXRuntimeException(e);
            }
        }
    }

    @Override
    public void done() {
    }

    @Override
    public void handleResult(CloseableIteration<T, QueryEvaluationException> res) {
        throw new RuntimeException("Unsupported Operation for this scheduler.");
    }

    @Override
    public void informFinish() {
        throw new RuntimeException("Unsupported Operation for this scheduler!");
    }

    public void informFinish(ParallelExecutor<T> control) {
    }

    @Override
    public boolean isRunning() {
        throw new RuntimeException("Unsupported Operation for this scheduler.");
    }

    public boolean isRunning(ParallelExecutor<T> control) {
        return true;
    }

    @Override
    public void toss(Exception e) {
        throw new RuntimeException("Unsupported Operation for this scheduler.");
    }

    @Override
    public void shutdown() {
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(30L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new FedXRuntimeException(e);
        }
    }

    @Override
    public void setTaskWrapper(TaskWrapper taskWrapper) {
        this.taskWrapper = taskWrapper;
    }

    protected class ControlStatus {
        public int waiting;
        public boolean done;

        public ControlStatus(int waiting, boolean done) {
            this.waiting = waiting;
            this.done = done;
        }
    }

    class WorkerRunnable
    implements Runnable {
        private final ParallelTask<T> task;
        private final ParallelExecutor<T> taskControl;
        private volatile boolean aborted = false;

        public WorkerRunnable(ParallelTask<T> task) {
            this.task = task;
            this.taskControl = task.getControl();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            CloseableIteration res = null;
            try {
                if (this.aborted || Thread.currentThread().isInterrupted() || this.taskControl.isFinished()) {
                    throw new InterruptedException();
                }
                if (log.isTraceEnabled()) {
                    log.trace("Performing task " + this.task + " in " + Thread.currentThread().getName());
                }
                res = this.task.performTask();
                this.taskControl.addResult(res);
                if (this.aborted) {
                    res.close();
                }
                this.taskControl.done();
            }
            catch (Throwable t) {
                try {
                    if (t instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    log.debug("Exception encountered while evaluating task (" + t.getClass().getSimpleName() + "): " + t.getMessage());
                }
                finally {
                    try {
                        this.taskControl.toss(ExceptionUtil.toException(t));
                    }
                    finally {
                        try {
                            if (res != null) {
                                res.close();
                            }
                        }
                        finally {
                            this.task.cancel();
                        }
                    }
                }
            }
        }

        public void abort() {
            this.aborted = true;
        }
    }
}

