001// License: GPL. For details, see LICENSE file.
002package org.openstreetmap.josm.data.cache;
003
004import java.io.IOException;
005import java.net.URL;
006import java.util.Iterator;
007import java.util.Map;
008import java.util.concurrent.ConcurrentHashMap;
009import java.util.concurrent.LinkedBlockingDeque;
010import java.util.concurrent.Semaphore;
011import java.util.concurrent.ThreadPoolExecutor;
012import java.util.concurrent.TimeUnit;
013
014import org.openstreetmap.josm.tools.Logging;
015
016/**
017 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task
018 * and it will set a runnable task with semaphore release, when job has finished.
019 * <p>
020 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't
021 * guarantee that all threads will be busy, when there is work for them[2]. <br>
022 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus
023 *     tasks do not go through the Queue <br>
024 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread
025 *     take the first available job and wait for semaphore. It might be the case, that semaphore was released
026 *     for some task further in queue, but this implementation doesn't try to detect such situation
027 *
028 * @author Wiktor Niesiobędzki
029 */
030public class HostLimitQueue extends LinkedBlockingDeque<Runnable> {
031    private static final long serialVersionUID = 1L;
032
033    private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>();
034    private final int hostLimit;
035
036    private ThreadPoolExecutor executor;
037
038    private int corePoolSize;
039
040    private int maximumPoolSize;
041
042    /**
043     * Creates an unbounded queue
044     * @param hostLimit how many parallel calls to host to allow
045     */
046    public HostLimitQueue(int hostLimit) {
047        super(); // create unbounded queue
048        this.hostLimit = hostLimit;
049    }
050
051    private JCSCachedTileLoaderJob<?, ?> findJob() {
052        for (Iterator<Runnable> it = iterator(); it.hasNext();) {
053            Runnable r = it.next();
054            if (r instanceof JCSCachedTileLoaderJob) {
055                JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r;
056                if (tryAcquireSemaphore(job)) {
057                    if (remove(job)) {
058                        return job;
059                    } else {
060                        // we have acquired the semaphore, but we didn't manage to remove job, as someone else did
061                        // release the semaphore and look for another candidate
062                        releaseSemaphore(job);
063                    }
064                } else {
065                    URL url = null;
066                    try {
067                        url = job.getUrl();
068                    } catch (IOException e) {
069                        Logging.debug(e);
070                    }
071                    Logging.debug("TMS - Skipping job {0} because host limit reached", url);
072                }
073            }
074        }
075        return null;
076    }
077
078    @Override
079    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
080        Runnable job = findJob();
081        if (job != null) {
082            return job;
083        }
084        job = pollFirst(timeout, unit);
085        if (job != null) {
086            try {
087                boolean gotLock = tryAcquireSemaphore(job, timeout, unit);
088                return gotLock ? job : null;
089            } catch (InterruptedException e) {
090                // acquire my got interrupted, first offer back what was taken
091                if (!offer(job)) {
092                    Logging.warn("Unable to offer back " + job);
093                }
094                throw e;
095            }
096        }
097        return job;
098    }
099
100    @Override
101    public Runnable take() throws InterruptedException {
102        Runnable job = findJob();
103        if (job != null) {
104            return job;
105        }
106        job = takeFirst();
107        try {
108            acquireSemaphore(job);
109        } catch (InterruptedException e) {
110            // acquire my got interrupted, first offer back what was taken
111            if (!offer(job)) {
112                Logging.warn("Unable to offer back " + job);
113            }
114            throw e;
115        }
116        return job;
117    }
118
119    /**
120     * Set the executor for which this queue works. It's needed to spawn new threads.
121     * See: http://stackoverflow.com/questions/9622599/java-threadpoolexecutor-strategy-direct-handoff-with-queue#
122     *
123     * @param executor executor for which this queue works
124     */
125    public void setExecutor(ThreadPoolExecutor executor) {
126        this.executor = executor;
127        this.maximumPoolSize = executor.getMaximumPoolSize();
128        this.corePoolSize = executor.getCorePoolSize();
129    }
130
131    @Override
132    public boolean offer(Runnable e) {
133        if (!super.offer(e)) {
134            return false;
135        }
136
137        if (executor != null) {
138            // See: http://stackoverflow.com/questions/9622599/java-threadpoolexecutor-strategy-direct-handoff-with-queue#
139            // force spawn of a thread if not reached maximum
140            int currentPoolSize = executor.getPoolSize();
141            if (currentPoolSize < maximumPoolSize
142                    && currentPoolSize >= corePoolSize) {
143                executor.setCorePoolSize(currentPoolSize + 1);
144                executor.setCorePoolSize(corePoolSize);
145            }
146        }
147        return true;
148    }
149
150    private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
151        String host;
152        try {
153            host = job.getUrl().getHost();
154        } catch (IOException e) {
155            // do not pass me illegal URL's
156            throw new IllegalArgumentException(e);
157        }
158        Semaphore limit = hostSemaphores.get(host);
159        if (limit == null) {
160            synchronized (hostSemaphores) {
161                limit = hostSemaphores.computeIfAbsent(host, k -> new Semaphore(hostLimit));
162            }
163        }
164        return limit;
165    }
166
167    private void acquireSemaphore(Runnable job) throws InterruptedException {
168        if (job instanceof JCSCachedTileLoaderJob) {
169            final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
170            getSemaphore(jcsJob).acquire();
171            jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
172        }
173    }
174
175    private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) {
176        boolean ret = true;
177        Semaphore limit = getSemaphore(job);
178        if (limit != null) {
179            ret = limit.tryAcquire();
180            if (ret) {
181                job.setFinishedTask(() -> releaseSemaphore(job));
182            }
183        }
184        return ret;
185    }
186
187    private boolean tryAcquireSemaphore(Runnable job, long timeout, TimeUnit unit) throws InterruptedException {
188        boolean ret = true;
189        if (job instanceof JCSCachedTileLoaderJob) {
190            final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
191            Semaphore limit = getSemaphore(jcsJob);
192            if (limit != null) {
193                ret = limit.tryAcquire(timeout, unit);
194                if (ret) {
195                    jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
196                }
197            }
198        }
199        return ret;
200    }
201
202    private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
203        Semaphore limit = getSemaphore(job);
204        if (limit != null) {
205            limit.release();
206            if (limit.availablePermits() > hostLimit) {
207                Logging.warn("More permits than it should be");
208            }
209        }
210    }
211}