001/*
002 * Copyright 2009-2020 Ping Identity Corporation
003 * All Rights Reserved.
004 */
005/*
006 * Copyright 2009-2020 Ping Identity Corporation
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *    http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020/*
021 * Copyright (C) 2009-2020 Ping Identity Corporation
022 *
023 * This program is free software; you can redistribute it and/or modify
024 * it under the terms of the GNU General Public License (GPLv2 only)
025 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
026 * as published by the Free Software Foundation.
027 *
028 * This program is distributed in the hope that it will be useful,
029 * but WITHOUT ANY WARRANTY; without even the implied warranty of
030 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
031 * GNU General Public License for more details.
032 *
033 * You should have received a copy of the GNU General Public License
034 * along with this program; if not, see <http://www.gnu.org/licenses>.
035 */
036package com.unboundid.ldap.sdk;
037
038
039
040import java.util.concurrent.LinkedBlockingQueue;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicBoolean;
043import java.util.concurrent.atomic.AtomicReference;
044
045import com.unboundid.util.Debug;
046import com.unboundid.util.InternalUseOnly;
047import com.unboundid.util.ThreadSafety;
048import com.unboundid.util.ThreadSafetyLevel;
049import com.unboundid.util.Validator;
050
051import static com.unboundid.ldap.sdk.LDAPMessages.*;
052
053
054
055/**
056 * This class provides an {@link EntrySource} that will read entries matching a
057 * given set of search criteria from an LDAP directory server.  It may
058 * optionally close the associated connection after all entries have been read.
059 * <BR><BR>
060 * This implementation processes the search asynchronously, which provides two
061 * benefits:
062 * <UL>
063 *   <LI>It makes it easier to provide a throttling mechanism to prevent the
064 *       entries from piling up and causing the client to run out of memory if
065 *       the server returns them faster than the client can process them.  If
066 *       this occurs, then the client will queue up a small number of entries
067 *       but will then push back against the server to block it from sending
068 *       additional entries until the client can catch up.  In this case, no
069 *       entries should be lost, although some servers may impose limits on how
070 *       long a search may be active or other forms of constraints.</LI>
071 *   <LI>It makes it possible to abandon the search if the entry source is no
072 *       longer needed (as signified by calling the {@link #close} method) and
073 *       the caller intends to stop iterating through the results.</LI>
074 * </UL>
075 * <H2>Example</H2>
076 * The following example demonstrates the process that may be used for iterating
077 * across all entries containing the {@code person} object class using the LDAP
078 * entry source API:
079 * <PRE>
080 * SearchRequest searchRequest = new SearchRequest("dc=example,dc=com",
081 *      SearchScope.SUB, Filter.createEqualityFilter("objectClass", "person"));
082 * LDAPEntrySource entrySource = new LDAPEntrySource(connection,
083 *      searchRequest, false);
084 *
085 * int entriesRead = 0;
086 * int referencesRead = 0;
087 * int exceptionsCaught = 0;
088 * try
089 * {
090 *   while (true)
091 *   {
092 *     try
093 *     {
094 *       Entry entry = entrySource.nextEntry();
095 *       if (entry == null)
096 *       {
097 *         // There are no more entries to be read.
098 *         break;
099 *       }
100 *       else
101 *       {
102 *         // Do something with the entry here.
103 *         entriesRead++;
104 *       }
105 *     }
106 *     catch (SearchResultReferenceEntrySourceException e)
107 *     {
108 *       // The directory server returned a search result reference.
109 *       SearchResultReference searchReference = e.getSearchReference();
110 *       referencesRead++;
111 *     }
112 *     catch (EntrySourceException e)
113 *     {
114 *       // Some kind of problem was encountered (e.g., the connection is no
115 *       // longer valid).  See if we can continue reading entries.
116 *       exceptionsCaught++;
117 *       if (! e.mayContinueReading())
118 *       {
119 *         break;
120 *       }
121 *     }
122 *   }
123 * }
124 * finally
125 * {
126 *   entrySource.close();
127 * }
128 * </PRE>
129 */
130@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE)
131public final class LDAPEntrySource
132       extends EntrySource
133       implements AsyncSearchResultListener
134{
135  /**
136   * The bogus entry that will be used to signify the end of the results.
137   */
138  private static final String END_OF_RESULTS = "END OF RESULTS";
139
140
141
142  /**
143   * The serial version UID for this serializable class.
144   */
145  private static final long serialVersionUID = 1080386705549149135L;
146
147
148
149  // The request ID associated with the asynchronous search.
150  private final AsyncRequestID asyncRequestID;
151
152  // Indicates whether this entry source has been closed.
153  private final AtomicBoolean closed;
154
155  // The search result for the search operation.
156  private final AtomicReference<SearchResult> searchResult;
157
158  // Indicates whether to close the connection when this entry source is closed.
159  private final boolean closeConnection;
160
161  // The connection that will be used to read the entries.
162  private final LDAPConnection connection;
163
164  // The queue from which entries will be read.
165  private final LinkedBlockingQueue<Object> queue;
166
167
168
169  /**
170   * Creates a new LDAP entry source with the provided information.
171   *
172   * @param  connection       The connection to the directory server from which
173   *                          the entries will be read.  It must not be
174   *                          {@code null}.
175   * @param  searchRequest    The search request that will be used to identify
176   *                          which entries should be returned.  It must not be
177   *                          {@code null}, and it must not be configured with a
178   *                          {@link SearchResultListener}.
179   * @param  closeConnection  Indicates whether the provided connection should
180   *                          be closed whenever all of the entries have been
181   *                          read, or if the {@link #close} method is called.
182   *
183   * @throws  LDAPException  If there is a problem with the provided search
184   *                         request or when trying to communicate with the
185   *                         directory server over the provided connection.
186   */
187  public LDAPEntrySource(final LDAPConnection connection,
188                         final SearchRequest searchRequest,
189                         final boolean closeConnection)
190         throws LDAPException
191  {
192    this(connection, searchRequest, closeConnection, 100);
193  }
194
195
196
197  /**
198   * Creates a new LDAP entry source with the provided information.
199   *
200   * @param  connection       The connection to the directory server from which
201   *                          the entries will be read.  It must not be
202   *                          {@code null}.
203   * @param  searchRequest    The search request that will be used to identify
204   *                          which entries should be returned.  It must not be
205   *                          {@code null}, and it must not be configured with a
206   *                          {@link SearchResultListener}.
207   * @param  closeConnection  Indicates whether the provided connection should
208   *                          be closed whenever all of the entries have been
209   *                          read, or if the {@link #close} method is called.
210   * @param  queueSize        The size of the internal queue used to hold search
211   *                          result entries until they can be consumed by the
212   *                          {@link #nextEntry} method.  The value must be
213   *                          greater than zero.
214   *
215   * @throws  LDAPException  If there is a problem with the provided search
216   *                         request or when trying to communicate with the
217   *                         directory server over the provided connection.
218   */
219  public LDAPEntrySource(final LDAPConnection connection,
220                         final SearchRequest searchRequest,
221                         final boolean closeConnection,
222                         final int queueSize)
223         throws LDAPException
224  {
225    Validator.ensureNotNull(connection, searchRequest);
226    Validator.ensureTrue(queueSize > 0,
227         "LDAPEntrySource.queueSize must be greater than 0.");
228
229    this.connection      = connection;
230    this.closeConnection = closeConnection;
231
232    if (searchRequest.getSearchResultListener() != null)
233    {
234      throw new LDAPException(ResultCode.PARAM_ERROR,
235                              ERR_LDAP_ENTRY_SOURCE_REQUEST_HAS_LISTENER.get());
236    }
237
238    closed       = new AtomicBoolean(false);
239    searchResult = new AtomicReference<>();
240    queue        = new LinkedBlockingQueue<>(queueSize);
241
242    final SearchRequest r = new SearchRequest(this, searchRequest.getControls(),
243         searchRequest.getBaseDN(), searchRequest.getScope(),
244         searchRequest.getDereferencePolicy(), searchRequest.getSizeLimit(),
245         searchRequest.getTimeLimitSeconds(), searchRequest.typesOnly(),
246         searchRequest.getFilter(), searchRequest.getAttributes());
247    asyncRequestID = connection.asyncSearch(r);
248  }
249
250
251
252  /**
253   * {@inheritDoc}
254   */
255  @Override()
256  public Entry nextEntry()
257         throws EntrySourceException
258  {
259    while (true)
260    {
261      if (closed.get() && queue.isEmpty())
262      {
263        return null;
264      }
265
266      final Object o;
267      try
268      {
269        o = queue.poll(10L, TimeUnit.MILLISECONDS);
270      }
271      catch (final InterruptedException ie)
272      {
273        Debug.debugException(ie);
274        Thread.currentThread().interrupt();
275        throw new EntrySourceException(true,
276             ERR_LDAP_ENTRY_SOURCE_NEXT_ENTRY_INTERRUPTED.get(), ie);
277      }
278
279      if (o != null)
280      {
281        if (o == END_OF_RESULTS)
282        {
283          return null;
284        }
285        else if (o instanceof Entry)
286        {
287          return (Entry) o;
288        }
289        else
290        {
291          throw (EntrySourceException) o;
292        }
293      }
294    }
295  }
296
297
298
299  /**
300   * {@inheritDoc}
301   */
302  @Override()
303  public void close()
304  {
305    closeInternal(true);
306  }
307
308
309
310  /**
311   * Closes this LDAP entry source.
312   *
313   * @param  abandon  Indicates whether to attempt to abandon the search.
314   */
315  private void closeInternal(final boolean abandon)
316  {
317    addToQueue(END_OF_RESULTS);
318
319    if (closed.compareAndSet(false, true))
320    {
321      if (abandon)
322      {
323        try
324        {
325          connection.abandon(asyncRequestID);
326        }
327        catch (final Exception e)
328        {
329          Debug.debugException(e);
330        }
331      }
332
333      if (closeConnection)
334      {
335        connection.close();
336      }
337    }
338  }
339
340
341
342  /**
343   * Retrieves the search result for the search operation, if available.  It
344   * will not be available until the search has completed (as indicated by a
345   * {@code null} return value from the {@link #nextEntry} method).
346   *
347   * @return  The search result for the search operation, or {@code null} if it
348   *          is not available (e.g., because the search has not yet completed).
349   */
350  public SearchResult getSearchResult()
351  {
352    return searchResult.get();
353  }
354
355
356
357  /**
358   * {@inheritDoc}  This is intended for internal use only and should not be
359   * called by anything outside of the LDAP SDK itself.
360   */
361  @InternalUseOnly()
362  @Override()
363  public void searchEntryReturned(final SearchResultEntry searchEntry)
364  {
365    addToQueue(searchEntry);
366  }
367
368
369
370  /**
371   * {@inheritDoc}  This is intended for internal use only and should not be
372   * called by anything outside of the LDAP SDK itself.
373   */
374  @InternalUseOnly()
375  @Override()
376  public void searchReferenceReturned(
377                   final SearchResultReference searchReference)
378  {
379    addToQueue(new SearchResultReferenceEntrySourceException(searchReference));
380  }
381
382
383
384  /**
385   * {@inheritDoc}  This is intended for internal use only and should not be
386   * called by anything outside of the LDAP SDK itself.
387   */
388  @InternalUseOnly()
389  @Override()
390  public void searchResultReceived(final AsyncRequestID requestID,
391                                   final SearchResult searchResult)
392  {
393    this.searchResult.set(searchResult);
394
395    if (! searchResult.getResultCode().equals(ResultCode.SUCCESS))
396    {
397      addToQueue(new EntrySourceException(false,
398           new LDAPSearchException(searchResult)));
399    }
400
401    closeInternal(false);
402  }
403
404
405
406  /**
407   * Adds the provided object to the queue, waiting as long as needed until it
408   * has been added.
409   *
410   * @param  o  The object to be added.  It must not be {@code null}.
411   */
412  private void addToQueue(final Object o)
413  {
414    while (true)
415    {
416      if (closed.get())
417      {
418        return;
419      }
420
421      try
422      {
423        if (queue.offer(o, 100L, TimeUnit.MILLISECONDS))
424        {
425          return;
426        }
427      }
428      catch (final InterruptedException ie)
429      {
430        Debug.debugException(ie);
431      }
432    }
433  }
434}