001/* 002 * Copyright 2016-2020 Ping Identity Corporation 003 * All Rights Reserved. 004 */ 005/* 006 * Copyright 2016-2020 Ping Identity Corporation 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020/* 021 * Copyright (C) 2016-2020 Ping Identity Corporation 022 * 023 * This program is free software; you can redistribute it and/or modify 024 * it under the terms of the GNU General Public License (GPLv2 only) 025 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only) 026 * as published by the Free Software Foundation. 027 * 028 * This program is distributed in the hope that it will be useful, 029 * but WITHOUT ANY WARRANTY; without even the implied warranty of 030 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 031 * GNU General Public License for more details. 032 * 033 * You should have received a copy of the GNU General Public License 034 * along with this program; if not, see <http://www.gnu.org/licenses>. 035 */ 036package com.unboundid.ldap.listener; 037 038 039 040import java.util.List; 041import java.util.concurrent.Semaphore; 042import java.util.concurrent.TimeUnit; 043 044import com.unboundid.ldap.protocol.AbandonRequestProtocolOp; 045import com.unboundid.ldap.protocol.AddRequestProtocolOp; 046import com.unboundid.ldap.protocol.AddResponseProtocolOp; 047import com.unboundid.ldap.protocol.BindRequestProtocolOp; 048import com.unboundid.ldap.protocol.BindResponseProtocolOp; 049import com.unboundid.ldap.protocol.CompareRequestProtocolOp; 050import com.unboundid.ldap.protocol.CompareResponseProtocolOp; 051import com.unboundid.ldap.protocol.DeleteRequestProtocolOp; 052import com.unboundid.ldap.protocol.DeleteResponseProtocolOp; 053import com.unboundid.ldap.protocol.ExtendedRequestProtocolOp; 054import com.unboundid.ldap.protocol.ExtendedResponseProtocolOp; 055import com.unboundid.ldap.protocol.LDAPMessage; 056import com.unboundid.ldap.protocol.ModifyRequestProtocolOp; 057import com.unboundid.ldap.protocol.ModifyResponseProtocolOp; 058import com.unboundid.ldap.protocol.ModifyDNRequestProtocolOp; 059import com.unboundid.ldap.protocol.ModifyDNResponseProtocolOp; 060import com.unboundid.ldap.protocol.SearchRequestProtocolOp; 061import com.unboundid.ldap.protocol.SearchResultDoneProtocolOp; 062import com.unboundid.ldap.sdk.Control; 063import com.unboundid.ldap.sdk.LDAPException; 064import com.unboundid.ldap.sdk.OperationType; 065import com.unboundid.ldap.sdk.ResultCode; 066import com.unboundid.util.Debug; 067import com.unboundid.util.NotMutable; 068import com.unboundid.util.StaticUtils; 069import com.unboundid.util.ThreadSafety; 070import com.unboundid.util.ThreadSafetyLevel; 071import com.unboundid.util.Validator; 072 073import static com.unboundid.ldap.listener.ListenerMessages.*; 074 075 076 077/** 078 * This class provides an implementation of an LDAP listener request handler 079 * that can be used to limit the number of requests that may be processed 080 * concurrently. It uses one or more {@link Semaphore} instances to limit the 081 * number of requests that may be processed at any time, and provides the 082 * ability to impose limiting on a per-operation-type basis. 083 */ 084@NotMutable() 085@ThreadSafety(level=ThreadSafetyLevel.COMPLETELY_THREADSAFE) 086public final class ConcurrentRequestLimiterRequestHandler 087 extends LDAPListenerRequestHandler 088{ 089 // The downstream request handler that will be used to process the requests 090 // after any appropriate concurrent request limiting has been performed. 091 private final LDAPListenerRequestHandler downstreamRequestHandler; 092 093 // A timeout value (expressed in milliseconds) that will cause the operation 094 // to be rejected rather than processed if the associated semaphore cannot be 095 // acquired in this length of time. 096 private final long rejectTimeoutMillis; 097 098 // The semaphores that will be used for each type of operation. 099 private final Semaphore abandonSemaphore; 100 private final Semaphore addSemaphore; 101 private final Semaphore bindSemaphore; 102 private final Semaphore compareSemaphore; 103 private final Semaphore deleteSemaphore; 104 private final Semaphore extendedSemaphore; 105 private final Semaphore modifySemaphore; 106 private final Semaphore modifyDNSemaphore; 107 private final Semaphore searchSemaphore; 108 109 110 111 /** 112 * Creates a new concurrent request limiter request handler that will impose 113 * the specified limit on the number of operations that may be in progress at 114 * any time. The limit will be enforced for all types of operations except 115 * abandon and unbind operations, which will not be limited. 116 * 117 * @param downstreamRequestHandler The downstream request handler that will 118 * be used to actually process the requests 119 * after any appropriate limiting has been 120 * performed. 121 * @param maxConcurrentRequests The maximum number of requests that may 122 * be processed at any given time. This 123 * limit will be enforced for all operation 124 * types except abandon and unbind, which 125 * will not be limited. 126 * @param rejectTimeoutMillis A timeout value (expressed in 127 * milliseconds) that will cause a requested 128 * operation to be rejected rather than 129 * processed if the associate semaphore 130 * cannot be acquired in this length of 131 * time. A value of zero indicates that the 132 * operation should be rejected immediately 133 * if the maximum number of concurrent 134 * requests are already in progress. A 135 * value that is less than zero indicates 136 * that no timeout should be imposed and 137 * that requests should be forced to wait as 138 * long as necessary until they can be 139 * processed. 140 */ 141 public ConcurrentRequestLimiterRequestHandler( 142 final LDAPListenerRequestHandler downstreamRequestHandler, 143 final int maxConcurrentRequests, final long rejectTimeoutMillis) 144 { 145 this(downstreamRequestHandler, new Semaphore(maxConcurrentRequests), 146 rejectTimeoutMillis); 147 } 148 149 150 151 /** 152 * Creates a new concurrent request limiter request handler that will use the 153 * provided semaphore to limit on the number of operations that may be in 154 * progress at any time. The limit will be enforced for all types of 155 * operations except abandon and unbind operations, which will not be limited. 156 * 157 * @param downstreamRequestHandler The downstream request handler that will 158 * be used to actually process the requests 159 * after any appropriate limiting has been 160 * performed. 161 * @param semaphore The semaphore that will be used to limit 162 * the number of concurrent operations in 163 * progress, for all operation types except 164 * abandon and unbind. 165 * @param rejectTimeoutMillis A timeout value (expressed in 166 * milliseconds) that will cause a requested 167 * operation to be rejected rather than 168 * processed if the associate semaphore 169 * cannot be acquired in this length of 170 * time. A value of zero indicates that the 171 * operation should be rejected immediately 172 * if the maximum number of concurrent 173 * requests are already in progress. A 174 * value that is less than zero indicates 175 * that no timeout should be imposed and 176 * that requests should be forced to wait as 177 * long as necessary until they can be 178 * processed. 179 */ 180 public ConcurrentRequestLimiterRequestHandler( 181 final LDAPListenerRequestHandler downstreamRequestHandler, 182 final Semaphore semaphore, final long rejectTimeoutMillis) 183 { 184 this(downstreamRequestHandler, null, semaphore, semaphore, semaphore, 185 semaphore, semaphore, semaphore, semaphore, semaphore, 186 rejectTimeoutMillis); 187 } 188 189 190 191 /** 192 * Creates a new concurrent request limiter request handler that can use the 193 * provided semaphore instances to limit the number of operations in progress 194 * concurrently for each type of operation. The same semaphore instance can 195 * be provided for multiple operation types if performance for those 196 * operations should be limited in aggregate rather than individually (e.g., 197 * if you don't want the total combined number of search and modify operations 198 * in progress at any time to exceed a given threshold, then you could provide 199 * the same semaphore instance for the {@code modifySemaphore} and 200 * {@code searchSemaphore} arguments). 201 * 202 * @param downstreamRequestHandler The downstream request handler that will 203 * be used to actually process the requests 204 * after any appropriate rate limiting has 205 * been performed. It must not be 206 * {@code null}. 207 * @param abandonSemaphore The semaphore to use when processing 208 * abandon operations. It may be 209 * {@code null} if no concurrent request 210 * limiting should be performed for abandon 211 * operations. 212 * @param addSemaphore The semaphore to use when processing add 213 * operations. It may be {@code null} if no 214 * concurrent request limiting should be 215 * performed for add operations. 216 * @param bindSemaphore The semaphore to use when processing 217 * bind operations. It may be 218 * {@code null} if no concurrent request 219 * limiting should be performed for bind 220 * operations. 221 * @param compareSemaphore The semaphore to use when processing 222 * compare operations. It may be 223 * {@code null} if no concurrent request 224 * limiting should be performed for compare 225 * operations. 226 * @param deleteSemaphore The semaphore to use when processing 227 * delete operations. It may be 228 * {@code null} if no concurrent request 229 * limiting should be performed for delete 230 * operations. 231 * @param extendedSemaphore The semaphore to use when processing 232 * extended operations. It may be 233 * {@code null} if no concurrent request 234 * limiting should be performed for extended 235 * operations. 236 * @param modifySemaphore The semaphore to use when processing 237 * modify operations. It may be 238 * {@code null} if no concurrent request 239 * limiting should be performed for modify 240 * operations. 241 * @param modifyDNSemaphore The semaphore to use when processing 242 * modify DN operations. It may be 243 * {@code null} if no concurrent request 244 * limiting should be performed for modify 245 * DN operations. 246 * @param searchSemaphore The semaphore to use when processing 247 * search operations. It may be 248 * {@code null} if no concurrent request 249 * limiting should be performed for search 250 * operations. 251 * @param rejectTimeoutMillis A timeout value (expressed in 252 * milliseconds) that will cause a requested 253 * operation to be rejected rather than 254 * processed if the associate semaphore 255 * cannot be acquired in this length of 256 * time. A value of zero indicates that the 257 * operation should be rejected immediately 258 * if the maximum number of concurrent 259 * requests are already in progress. A 260 * value that is less than zero indicates 261 * that no timeout should be imposed and 262 * that requests should be forced to wait as 263 * long as necessary until they can be 264 * processed. 265 */ 266 public ConcurrentRequestLimiterRequestHandler( 267 final LDAPListenerRequestHandler downstreamRequestHandler, 268 final Semaphore abandonSemaphore, 269 final Semaphore addSemaphore, 270 final Semaphore bindSemaphore, 271 final Semaphore compareSemaphore, 272 final Semaphore deleteSemaphore, 273 final Semaphore extendedSemaphore, 274 final Semaphore modifySemaphore, 275 final Semaphore modifyDNSemaphore, 276 final Semaphore searchSemaphore, 277 final long rejectTimeoutMillis) 278 { 279 Validator.ensureNotNull(downstreamRequestHandler); 280 281 this.downstreamRequestHandler = downstreamRequestHandler; 282 this.abandonSemaphore = abandonSemaphore; 283 this.addSemaphore = addSemaphore; 284 this.bindSemaphore = bindSemaphore; 285 this.compareSemaphore = compareSemaphore; 286 this.deleteSemaphore = deleteSemaphore; 287 this.extendedSemaphore = extendedSemaphore; 288 this.modifySemaphore = modifySemaphore; 289 this.modifyDNSemaphore = modifyDNSemaphore; 290 this.searchSemaphore = searchSemaphore; 291 292 if (rejectTimeoutMillis >= 0L) 293 { 294 this.rejectTimeoutMillis = rejectTimeoutMillis; 295 } 296 else 297 { 298 this.rejectTimeoutMillis = (long) Integer.MAX_VALUE; 299 } 300 } 301 302 303 304 /** 305 * {@inheritDoc} 306 */ 307 @Override() 308 public ConcurrentRequestLimiterRequestHandler newInstance( 309 final LDAPListenerClientConnection connection) 310 throws LDAPException 311 { 312 return new ConcurrentRequestLimiterRequestHandler( 313 downstreamRequestHandler.newInstance(connection), abandonSemaphore, 314 addSemaphore, bindSemaphore, compareSemaphore, deleteSemaphore, 315 extendedSemaphore, modifySemaphore, modifyDNSemaphore, 316 searchSemaphore, rejectTimeoutMillis); 317 } 318 319 320 321 /** 322 * {@inheritDoc} 323 */ 324 @Override() 325 public void processAbandonRequest(final int messageID, 326 final AbandonRequestProtocolOp request, 327 final List<Control> controls) 328 { 329 try 330 { 331 acquirePermit(abandonSemaphore, OperationType.ABANDON); 332 } 333 catch (final LDAPException le) 334 { 335 Debug.debugException(le); 336 return; 337 } 338 339 try 340 { 341 downstreamRequestHandler.processAbandonRequest(messageID, request, 342 controls); 343 } 344 finally 345 { 346 releasePermit(abandonSemaphore); 347 } 348 } 349 350 351 352 /** 353 * {@inheritDoc} 354 */ 355 @Override() 356 public LDAPMessage processAddRequest(final int messageID, 357 final AddRequestProtocolOp request, 358 final List<Control> controls) 359 { 360 try 361 { 362 acquirePermit(addSemaphore, OperationType.ADD); 363 } 364 catch (final LDAPException le) 365 { 366 Debug.debugException(le); 367 return new LDAPMessage(messageID, 368 new AddResponseProtocolOp(le.toLDAPResult())); 369 } 370 371 try 372 { 373 return downstreamRequestHandler.processAddRequest(messageID, request, 374 controls); 375 } 376 finally 377 { 378 releasePermit(addSemaphore); 379 } 380 } 381 382 383 384 /** 385 * {@inheritDoc} 386 */ 387 @Override() 388 public LDAPMessage processBindRequest(final int messageID, 389 final BindRequestProtocolOp request, 390 final List<Control> controls) 391 { 392 try 393 { 394 acquirePermit(bindSemaphore, OperationType.BIND); 395 } 396 catch (final LDAPException le) 397 { 398 Debug.debugException(le); 399 return new LDAPMessage(messageID, 400 new BindResponseProtocolOp(le.toLDAPResult())); 401 } 402 403 try 404 { 405 return downstreamRequestHandler.processBindRequest(messageID, request, 406 controls); 407 } 408 finally 409 { 410 releasePermit(bindSemaphore); 411 } 412 } 413 414 415 416 /** 417 * {@inheritDoc} 418 */ 419 @Override() 420 public LDAPMessage processCompareRequest(final int messageID, 421 final CompareRequestProtocolOp request, 422 final List<Control> controls) 423 { 424 try 425 { 426 acquirePermit(compareSemaphore, OperationType.COMPARE); 427 } 428 catch (final LDAPException le) 429 { 430 Debug.debugException(le); 431 return new LDAPMessage(messageID, 432 new CompareResponseProtocolOp(le.toLDAPResult())); 433 } 434 435 try 436 { 437 return downstreamRequestHandler.processCompareRequest(messageID, request, 438 controls); 439 } 440 finally 441 { 442 releasePermit(compareSemaphore); 443 } 444 } 445 446 447 448 /** 449 * {@inheritDoc} 450 */ 451 @Override() 452 public LDAPMessage processDeleteRequest(final int messageID, 453 final DeleteRequestProtocolOp request, 454 final List<Control> controls) 455 { 456 try 457 { 458 acquirePermit(deleteSemaphore, OperationType.DELETE); 459 } 460 catch (final LDAPException le) 461 { 462 Debug.debugException(le); 463 return new LDAPMessage(messageID, 464 new DeleteResponseProtocolOp(le.toLDAPResult())); 465 } 466 467 try 468 { 469 return downstreamRequestHandler.processDeleteRequest(messageID, request, 470 controls); 471 } 472 finally 473 { 474 releasePermit(deleteSemaphore); 475 } 476 } 477 478 479 480 /** 481 * {@inheritDoc} 482 */ 483 @Override() 484 public LDAPMessage processExtendedRequest(final int messageID, 485 final ExtendedRequestProtocolOp request, 486 final List<Control> controls) 487 { 488 try 489 { 490 acquirePermit(extendedSemaphore, OperationType.EXTENDED); 491 } 492 catch (final LDAPException le) 493 { 494 Debug.debugException(le); 495 return new LDAPMessage(messageID, 496 new ExtendedResponseProtocolOp(le.toLDAPResult())); 497 } 498 499 try 500 { 501 return downstreamRequestHandler.processExtendedRequest(messageID, request, 502 controls); 503 } 504 finally 505 { 506 releasePermit(extendedSemaphore); 507 } 508 } 509 510 511 512 /** 513 * {@inheritDoc} 514 */ 515 @Override() 516 public LDAPMessage processModifyRequest(final int messageID, 517 final ModifyRequestProtocolOp request, 518 final List<Control> controls) 519 { 520 try 521 { 522 acquirePermit(modifySemaphore, OperationType.MODIFY); 523 } 524 catch (final LDAPException le) 525 { 526 Debug.debugException(le); 527 return new LDAPMessage(messageID, 528 new ModifyResponseProtocolOp(le.toLDAPResult())); 529 } 530 531 try 532 { 533 return downstreamRequestHandler.processModifyRequest(messageID, request, 534 controls); 535 } 536 finally 537 { 538 releasePermit(modifySemaphore); 539 } 540 } 541 542 543 544 /** 545 * {@inheritDoc} 546 */ 547 @Override() 548 public LDAPMessage processModifyDNRequest(final int messageID, 549 final ModifyDNRequestProtocolOp request, 550 final List<Control> controls) 551 { 552 try 553 { 554 acquirePermit(modifyDNSemaphore, OperationType.MODIFY_DN); 555 } 556 catch (final LDAPException le) 557 { 558 Debug.debugException(le); 559 return new LDAPMessage(messageID, 560 new ModifyDNResponseProtocolOp(le.toLDAPResult())); 561 } 562 563 try 564 { 565 return downstreamRequestHandler.processModifyDNRequest(messageID, request, 566 controls); 567 } 568 finally 569 { 570 releasePermit(modifyDNSemaphore); 571 } 572 } 573 574 575 576 /** 577 * {@inheritDoc} 578 */ 579 @Override() 580 public LDAPMessage processSearchRequest(final int messageID, 581 final SearchRequestProtocolOp request, 582 final List<Control> controls) 583 { 584 try 585 { 586 acquirePermit(searchSemaphore, OperationType.SEARCH); 587 } 588 catch (final LDAPException le) 589 { 590 Debug.debugException(le); 591 return new LDAPMessage(messageID, 592 new SearchResultDoneProtocolOp(le.toLDAPResult())); 593 } 594 595 try 596 { 597 return downstreamRequestHandler.processSearchRequest(messageID, request, 598 controls); 599 } 600 finally 601 { 602 releasePermit(searchSemaphore); 603 } 604 } 605 606 607 608 /** 609 * Acquires a permit from the provided semaphore. 610 * 611 * @param semaphore The semaphore from which to acquire a permit. It 612 * may be {@code null} if no semaphore is needed for 613 * the associated operation type. 614 * @param operationType The type of operation 615 * 616 * @throws LDAPException If it was not possible to acquire a permit from the 617 * provided semaphore. 618 */ 619 private void acquirePermit(final Semaphore semaphore, 620 final OperationType operationType) 621 throws LDAPException 622 { 623 if (semaphore == null) 624 { 625 return; 626 } 627 628 try 629 { 630 if (rejectTimeoutMillis == 0L) 631 { 632 if (! semaphore.tryAcquire()) 633 { 634 throw new LDAPException(ResultCode.BUSY, 635 ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_NO_TIMEOUT.get( 636 operationType.name())); 637 } 638 } 639 else 640 { 641 if (! semaphore.tryAcquire(rejectTimeoutMillis, TimeUnit.MILLISECONDS)) 642 { 643 throw new LDAPException(ResultCode.BUSY, 644 ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_TIMEOUT.get( 645 operationType.name(), rejectTimeoutMillis)); 646 } 647 } 648 } 649 catch (final LDAPException le) 650 { 651 throw le; 652 } 653 catch (final Exception e) 654 { 655 Debug.debugException(e); 656 throw new LDAPException(ResultCode.OTHER, 657 ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_SEMAPHORE_EXCEPTION.get( 658 operationType.name(), StaticUtils.getExceptionMessage(e)), 659 e); 660 } 661 } 662 663 664 665 /** 666 * Releases a permit back to the provided semaphore. 667 * 668 * @param semaphore The semaphore to which the permit should be released. 669 * It may be {@code null} if no semaphore is needed for the 670 * associated operation type. 671 */ 672 private static void releasePermit(final Semaphore semaphore) 673 { 674 if (semaphore != null) 675 { 676 semaphore.release(); 677 } 678 } 679}