001/* 002 * Copyright 2009-2020 Ping Identity Corporation 003 * All Rights Reserved. 004 */ 005/* 006 * Copyright 2009-2020 Ping Identity Corporation 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020/* 021 * Copyright (C) 2009-2020 Ping Identity Corporation 022 * 023 * This program is free software; you can redistribute it and/or modify 024 * it under the terms of the GNU General Public License (GPLv2 only) 025 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only) 026 * as published by the Free Software Foundation. 027 * 028 * This program is distributed in the hope that it will be useful, 029 * but WITHOUT ANY WARRANTY; without even the implied warranty of 030 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 031 * GNU General Public License for more details. 032 * 033 * You should have received a copy of the GNU General Public License 034 * along with this program; if not, see <http://www.gnu.org/licenses>. 035 */ 036package com.unboundid.ldap.sdk; 037 038 039 040import java.util.concurrent.LinkedBlockingQueue; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicBoolean; 043import java.util.concurrent.atomic.AtomicReference; 044 045import com.unboundid.util.Debug; 046import com.unboundid.util.InternalUseOnly; 047import com.unboundid.util.ThreadSafety; 048import com.unboundid.util.ThreadSafetyLevel; 049import com.unboundid.util.Validator; 050 051import static com.unboundid.ldap.sdk.LDAPMessages.*; 052 053 054 055/** 056 * This class provides an {@link EntrySource} that will read entries matching a 057 * given set of search criteria from an LDAP directory server. It may 058 * optionally close the associated connection after all entries have been read. 059 * <BR><BR> 060 * This implementation processes the search asynchronously, which provides two 061 * benefits: 062 * <UL> 063 * <LI>It makes it easier to provide a throttling mechanism to prevent the 064 * entries from piling up and causing the client to run out of memory if 065 * the server returns them faster than the client can process them. If 066 * this occurs, then the client will queue up a small number of entries 067 * but will then push back against the server to block it from sending 068 * additional entries until the client can catch up. In this case, no 069 * entries should be lost, although some servers may impose limits on how 070 * long a search may be active or other forms of constraints.</LI> 071 * <LI>It makes it possible to abandon the search if the entry source is no 072 * longer needed (as signified by calling the {@link #close} method) and 073 * the caller intends to stop iterating through the results.</LI> 074 * </UL> 075 * <H2>Example</H2> 076 * The following example demonstrates the process that may be used for iterating 077 * across all entries containing the {@code person} object class using the LDAP 078 * entry source API: 079 * <PRE> 080 * SearchRequest searchRequest = new SearchRequest("dc=example,dc=com", 081 * SearchScope.SUB, Filter.createEqualityFilter("objectClass", "person")); 082 * LDAPEntrySource entrySource = new LDAPEntrySource(connection, 083 * searchRequest, false); 084 * 085 * int entriesRead = 0; 086 * int referencesRead = 0; 087 * int exceptionsCaught = 0; 088 * try 089 * { 090 * while (true) 091 * { 092 * try 093 * { 094 * Entry entry = entrySource.nextEntry(); 095 * if (entry == null) 096 * { 097 * // There are no more entries to be read. 098 * break; 099 * } 100 * else 101 * { 102 * // Do something with the entry here. 103 * entriesRead++; 104 * } 105 * } 106 * catch (SearchResultReferenceEntrySourceException e) 107 * { 108 * // The directory server returned a search result reference. 109 * SearchResultReference searchReference = e.getSearchReference(); 110 * referencesRead++; 111 * } 112 * catch (EntrySourceException e) 113 * { 114 * // Some kind of problem was encountered (e.g., the connection is no 115 * // longer valid). See if we can continue reading entries. 116 * exceptionsCaught++; 117 * if (! e.mayContinueReading()) 118 * { 119 * break; 120 * } 121 * } 122 * } 123 * } 124 * finally 125 * { 126 * entrySource.close(); 127 * } 128 * </PRE> 129 */ 130@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE) 131public final class LDAPEntrySource 132 extends EntrySource 133 implements AsyncSearchResultListener 134{ 135 /** 136 * The bogus entry that will be used to signify the end of the results. 137 */ 138 private static final String END_OF_RESULTS = "END OF RESULTS"; 139 140 141 142 /** 143 * The serial version UID for this serializable class. 144 */ 145 private static final long serialVersionUID = 1080386705549149135L; 146 147 148 149 // The request ID associated with the asynchronous search. 150 private final AsyncRequestID asyncRequestID; 151 152 // Indicates whether this entry source has been closed. 153 private final AtomicBoolean closed; 154 155 // The search result for the search operation. 156 private final AtomicReference<SearchResult> searchResult; 157 158 // Indicates whether to close the connection when this entry source is closed. 159 private final boolean closeConnection; 160 161 // The connection that will be used to read the entries. 162 private final LDAPConnection connection; 163 164 // The queue from which entries will be read. 165 private final LinkedBlockingQueue<Object> queue; 166 167 168 169 /** 170 * Creates a new LDAP entry source with the provided information. 171 * 172 * @param connection The connection to the directory server from which 173 * the entries will be read. It must not be 174 * {@code null}. 175 * @param searchRequest The search request that will be used to identify 176 * which entries should be returned. It must not be 177 * {@code null}, and it must not be configured with a 178 * {@link SearchResultListener}. 179 * @param closeConnection Indicates whether the provided connection should 180 * be closed whenever all of the entries have been 181 * read, or if the {@link #close} method is called. 182 * 183 * @throws LDAPException If there is a problem with the provided search 184 * request or when trying to communicate with the 185 * directory server over the provided connection. 186 */ 187 public LDAPEntrySource(final LDAPConnection connection, 188 final SearchRequest searchRequest, 189 final boolean closeConnection) 190 throws LDAPException 191 { 192 this(connection, searchRequest, closeConnection, 100); 193 } 194 195 196 197 /** 198 * Creates a new LDAP entry source with the provided information. 199 * 200 * @param connection The connection to the directory server from which 201 * the entries will be read. It must not be 202 * {@code null}. 203 * @param searchRequest The search request that will be used to identify 204 * which entries should be returned. It must not be 205 * {@code null}, and it must not be configured with a 206 * {@link SearchResultListener}. 207 * @param closeConnection Indicates whether the provided connection should 208 * be closed whenever all of the entries have been 209 * read, or if the {@link #close} method is called. 210 * @param queueSize The size of the internal queue used to hold search 211 * result entries until they can be consumed by the 212 * {@link #nextEntry} method. The value must be 213 * greater than zero. 214 * 215 * @throws LDAPException If there is a problem with the provided search 216 * request or when trying to communicate with the 217 * directory server over the provided connection. 218 */ 219 public LDAPEntrySource(final LDAPConnection connection, 220 final SearchRequest searchRequest, 221 final boolean closeConnection, 222 final int queueSize) 223 throws LDAPException 224 { 225 Validator.ensureNotNull(connection, searchRequest); 226 Validator.ensureTrue(queueSize > 0, 227 "LDAPEntrySource.queueSize must be greater than 0."); 228 229 this.connection = connection; 230 this.closeConnection = closeConnection; 231 232 if (searchRequest.getSearchResultListener() != null) 233 { 234 throw new LDAPException(ResultCode.PARAM_ERROR, 235 ERR_LDAP_ENTRY_SOURCE_REQUEST_HAS_LISTENER.get()); 236 } 237 238 closed = new AtomicBoolean(false); 239 searchResult = new AtomicReference<>(); 240 queue = new LinkedBlockingQueue<>(queueSize); 241 242 final SearchRequest r = new SearchRequest(this, searchRequest.getControls(), 243 searchRequest.getBaseDN(), searchRequest.getScope(), 244 searchRequest.getDereferencePolicy(), searchRequest.getSizeLimit(), 245 searchRequest.getTimeLimitSeconds(), searchRequest.typesOnly(), 246 searchRequest.getFilter(), searchRequest.getAttributes()); 247 asyncRequestID = connection.asyncSearch(r); 248 } 249 250 251 252 /** 253 * {@inheritDoc} 254 */ 255 @Override() 256 public Entry nextEntry() 257 throws EntrySourceException 258 { 259 while (true) 260 { 261 if (closed.get() && queue.isEmpty()) 262 { 263 return null; 264 } 265 266 final Object o; 267 try 268 { 269 o = queue.poll(10L, TimeUnit.MILLISECONDS); 270 } 271 catch (final InterruptedException ie) 272 { 273 Debug.debugException(ie); 274 Thread.currentThread().interrupt(); 275 throw new EntrySourceException(true, 276 ERR_LDAP_ENTRY_SOURCE_NEXT_ENTRY_INTERRUPTED.get(), ie); 277 } 278 279 if (o != null) 280 { 281 if (o == END_OF_RESULTS) 282 { 283 return null; 284 } 285 else if (o instanceof Entry) 286 { 287 return (Entry) o; 288 } 289 else 290 { 291 throw (EntrySourceException) o; 292 } 293 } 294 } 295 } 296 297 298 299 /** 300 * {@inheritDoc} 301 */ 302 @Override() 303 public void close() 304 { 305 closeInternal(true); 306 } 307 308 309 310 /** 311 * Closes this LDAP entry source. 312 * 313 * @param abandon Indicates whether to attempt to abandon the search. 314 */ 315 private void closeInternal(final boolean abandon) 316 { 317 addToQueue(END_OF_RESULTS); 318 319 if (closed.compareAndSet(false, true)) 320 { 321 if (abandon) 322 { 323 try 324 { 325 connection.abandon(asyncRequestID); 326 } 327 catch (final Exception e) 328 { 329 Debug.debugException(e); 330 } 331 } 332 333 if (closeConnection) 334 { 335 connection.close(); 336 } 337 } 338 } 339 340 341 342 /** 343 * Retrieves the search result for the search operation, if available. It 344 * will not be available until the search has completed (as indicated by a 345 * {@code null} return value from the {@link #nextEntry} method). 346 * 347 * @return The search result for the search operation, or {@code null} if it 348 * is not available (e.g., because the search has not yet completed). 349 */ 350 public SearchResult getSearchResult() 351 { 352 return searchResult.get(); 353 } 354 355 356 357 /** 358 * {@inheritDoc} This is intended for internal use only and should not be 359 * called by anything outside of the LDAP SDK itself. 360 */ 361 @InternalUseOnly() 362 @Override() 363 public void searchEntryReturned(final SearchResultEntry searchEntry) 364 { 365 addToQueue(searchEntry); 366 } 367 368 369 370 /** 371 * {@inheritDoc} This is intended for internal use only and should not be 372 * called by anything outside of the LDAP SDK itself. 373 */ 374 @InternalUseOnly() 375 @Override() 376 public void searchReferenceReturned( 377 final SearchResultReference searchReference) 378 { 379 addToQueue(new SearchResultReferenceEntrySourceException(searchReference)); 380 } 381 382 383 384 /** 385 * {@inheritDoc} This is intended for internal use only and should not be 386 * called by anything outside of the LDAP SDK itself. 387 */ 388 @InternalUseOnly() 389 @Override() 390 public void searchResultReceived(final AsyncRequestID requestID, 391 final SearchResult searchResult) 392 { 393 this.searchResult.set(searchResult); 394 395 if (! searchResult.getResultCode().equals(ResultCode.SUCCESS)) 396 { 397 addToQueue(new EntrySourceException(false, 398 new LDAPSearchException(searchResult))); 399 } 400 401 closeInternal(false); 402 } 403 404 405 406 /** 407 * Adds the provided object to the queue, waiting as long as needed until it 408 * has been added. 409 * 410 * @param o The object to be added. It must not be {@code null}. 411 */ 412 private void addToQueue(final Object o) 413 { 414 while (true) 415 { 416 if (closed.get()) 417 { 418 return; 419 } 420 421 try 422 { 423 if (queue.offer(o, 100L, TimeUnit.MILLISECONDS)) 424 { 425 return; 426 } 427 } 428 catch (final InterruptedException ie) 429 { 430 Debug.debugException(ie); 431 } 432 } 433 } 434}