001/* 002Copyright (c) 2022-2023 Steve Shering 003 004All rights reserved. 005 006As a special exception, the copyright holder of this software gives you permission 007to use this software for personal, not-for-profit purposes. 008 009For any other purpose, a license must be obtained from the copyright holder. 010 011This copyright notice and this permission notice must be included in all copies 012of this software, including copies of parts of this software. 013 014THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 015IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 016FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 017AUTHOR OR COPYRIGHT HOLDER BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 018LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 019OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 020THE SOFTWARE. 021*/ 022package net.sherst.util; 023 024import java.io.Reader; 025import java.util.Collection; 026 027/** 028 * A modified (blocking) thread-safe ring buffer (circular buffer) implementation for Java. 029 * <p> 030 * The {@link add(E)} and {@link addAll(E[])} methods block until there is space in the buffer. 031 * The {@link remove()} and {@link remove(int, Class)} methods block 032 * until there are enough elements in the buffer. 033 * The {@link offer(E)} and {@link poll()} methods (and variants) do not block. 034 * <p> 035 * A ring buffer is a fixed-size first in - first out buffer 036 * typically backed by an array with head and tail pointers. 037 * Elements are added to the tail of the buffer and removed from the head. 038 * When the end of the array is reached, the pointers wrap around to the beginning. 039 * When the buffer is full, adding new elements overwrites the oldest elements 040 * (but see the {@link offer(E)} and {@link offer(E[])} methods in this implementation). 041 * <p> 042 * Pros: Fast, simple. 043 * <p> 044 * Cons: Size fixed at creation time, 045 * adding new elements overwrites the oldest ones if the buffer is full, 046 * elements can only be added or removed in FIFO order. 047 * <p> 048 * There are some useful look-ahead methods, {@link #peek()}, {@link #peek(int)}, {@link #at(E)}, 049 * {@link #at(E[])}, {@link #atSkip(E)} 050 * and {@link #atSkip(E[])}. 051 * <p> 052 * These are simple, hopefully fast, implementations. 053 * They deliberately do not implement Java's {@link java.util.Collection} or {@link java.util.Queue} interfaces, 054 * although the API is similar. 055 * <p> 056 * Variants:<ul> 057 * <li>{@link FastRingBuffer}: Simplest, fastest, not thread safe. 058 * <li>{@link SafeRingBuffer}: Thread safe. 059 * <li>{@link BlockingRingBuffer}: add and remove methods block if necessary; offer and poll methods are non-blocking. 060 * </ul> 061 * <p> 062 * {@link #remove()} sets the underlying array slot to <b>null</b> to avoid memory leaks. 063 * All other {@code remove} and {@code poll} methods call {@link #remove()}. 064 * <p> 065 * {@link #clear()} discards the underlying array to avoid memory leaks. 066 * {@link #removeAll()} calls {@link #clear()}. 067 * <p> 068 * Elements can be {@code null}, but this means that, 069 * if {@link #peek()}, {@link #poll()} or {@link #remove()} return null, 070 * this could be because an element was {@code null} or because the buffer was empty. 071 * Consider using {@link #isEmpty()} or {@link #size()} to disambiguate. 072 * 073 * @param <E> the type of elements in the buffer 074 * @author sherstDotNet@yahoo.com 075 */ 076public class BlockingRingBuffer<E> extends net.sherst.util.SafeRingBuffer<E> { 077 078 /** 079 * Creates a new buffer. 080 * 081 * @param capacity Maximum capacity of the buffer 082 */ 083 public BlockingRingBuffer(int capacity) { 084 super(capacity); 085 } 086 087 /** 088 * Adds an element to the buffer. 089 * Blocks while the buffer is full. 090 * 091 * @param e the element to add 092 * @return {@code true} 093 */ 094 @Override 095 public synchronized boolean add(E e) { 096 while (count>=capacity) { 097 try { 098 wait(); 099 ;} 100 catch (InterruptedException ex) { 101 ;} 102 ;} 103 boolean r=super.add(e); 104 notifyAll(); 105 return r; 106 } 107 108 /** 109 * Adds elements to the buffer, preserving their ordering in the array. 110 * Blocks until there is enough room in the buffer to add all the elements in one go. 111 * 112 * @param a the elements to add 113 * @return {@code true} 114 */ 115 @Override 116 public synchronized boolean addAll(E[] a) { 117 while (count+a.length>capacity) { 118 try { 119 wait(); 120 ;} 121 catch (InterruptedException ex) { 122 ;} 123 ;} 124 boolean r=super.addAll(a); 125 notifyAll(); 126 return r; 127 } 128 129 /** 130 * Adds elements to the buffer, preserving their ordering in the collection. 131 * Blocks until there is enough room in the buffer to add all the elements in one go. 132 * 133 * @param ecol the elements to add 134 * @return {@code true} 135 */ 136 @Override 137 public synchronized boolean addAll(Collection<E> c) { 138 while (count+c.size()>capacity) { 139 try { 140 wait(); 141 ;} 142 catch (InterruptedException ex) { 143 ;} 144 ;} 145 boolean r=super.addAll(c); 146 notifyAll(); 147 return r; 148 } 149 150 /** 151 * Returns {@code true} if the head of the buffer (the next element that will be removed) 152 * is equal to {@code o}, 153 * compared using {@link java.util.Objects#equals}. 154 * 155 * @param o the value to be compared 156 * @return {@code true} if the head of the buffer equals {@code o}; 157 * {@code false} if the buffer is empty 158 */ 159 @Override 160 public synchronized boolean at(E o) { 161 return super.at(o); 162 } 163 164 /** 165 * Returns {@code true} if the elements at the head of the buffer (the next elements that will be removed) 166 * are equal to the contents of {@code a}, 167 * compared using {@link java.util.Objects#equals}. 168 * 169 * @param a the values to be compared 170 * @return {@code true} if the elements at head of the buffer are equal to the elements of {@code a}; 171 * {@code false} if the buffer doesn't contain enough elements to match. 172 */ 173 @Override 174 public synchronized boolean at(E[] a) { 175 return super.at(a); 176 } 177 178 /** 179 * Compares the head of the buffer (the next element that will be removed) to {@code o} 180 * using {@link java.util.Objects#equals} 181 * and removes it from the buffer if there is a match. 182 * 183 * @param o the value to be compared 184 * @return {@code true} if the head of the buffer equals {@code o} and is removed; 185 * {@code false} if the buffer is empty 186 */ 187 @Override 188 public synchronized boolean atSkip(E o) { 189 return super.atSkip(o); 190 } 191 192 /** 193 * Compares the head of the buffer (the next elements that will be removed) to {@code a} 194 * using {@link java.util.Objects#equals} 195 * and removes them from the buffer if there is a match. 196 * 197 * @param a the values to be compared 198 * @return {@code true} if the elements at head of the buffer are equal to the elements of {@code a}; 199 * {@code false} if the buffer doesn't contain enough elements to match. 200 */ 201 @Override 202 public synchronized boolean atSkip(E[] a) { 203 return super.atSkip(a); 204 } 205 206 /** 207 * Empties the buffer. 208 */ 209 @Override 210 public synchronized void clear() { 211 super.clear(); 212 } 213 214 /** 215 * Returns the maximum capacity of this buffer. 216 * 217 * @return the maximum capacity of this buffer 218 */ 219 @Override 220 public int getCapacity() { 221 return super.getCapacity(); 222 } 223 224 /** 225 * Returns {@code true} if the buffer is empty. 226 * 227 * @return {@code true} if the buffer is empty 228 */ 229 @Override 230 public synchronized boolean isEmpty() { 231 return super.isEmpty(); 232 } 233 234 /** 235 * Adds an element to the buffer if there is space for it. 236 * Does not block. 237 * 238 * @param e the element to add. 239 * @return {@code true} if there was space in the buffer and the element was added 240 */ 241 @Override 242 public synchronized boolean offer(E e) { 243 return super.offer(e); 244 } 245 246 /** 247 * Adds elements to the buffer if there is space for them. 248 * Does not block. 249 * 250 * @param e the element to add. 251 * @return {@code true} if there was space in the buffer and the elements were added 252 */ 253 @Override 254 public synchronized boolean offer(E[] a) { 255 return super.offer(a); 256 } 257 258 /** 259 * Adds elements to the buffer if there is space for them. 260 * Does not block. 261 * 262 * @param e the element to add. 263 * @return {@code true} if there was space in the buffer and the elements were added 264 */ 265 @Override 266 public synchronized boolean offer(Collection<E> c) { 267 return super.offer(c); 268 } 269 270 /** 271 * Returns the head of the buffer (the next element that will be removed) without actually removing it. 272 * 273 * @return the element at the head of the buffer or {@code null} if the buffer is empty 274 */ 275 @Override 276 public synchronized E peek() { 277 return super.peek(); 278 } 279 280 /** 281 * Returns the {@code n}<i>th</i> element that will be removed from the buffer, 282 * counting from 0 ({@code peek(0)} returns the first element that will be removed), 283 * or {@code null} if the buffer does not contain that many elements. 284 * 285 * @param n 286 * @return the {@code n}<i>th</i> element that will be removed from the buffer 287 * or {@code null} if the buffer does not contain that many elements 288 */ 289 @Override 290 public synchronized E peek(int n) { 291 return super.peek(n); 292 } 293 294 /** 295 * Removes and returns the element at the head of the buffer, 296 * or {@code null} if the buffer is empty. 297 * 298 * @return the element that was at the head of the buffer which was removed, 299 * or {@code null} if the buffer is empty 300 */ 301 @Override 302 public synchronized E poll() { 303 return super.poll(); 304 } 305 306 /** 307 * Removes and returns {@code n} consecutive elements from the head of the buffer 308 * in the order they were added, or {@code null} if the buffer does not contain {@code n} elements. 309 * 310 * @param n the number of elements to remove 311 * @param cls the {@code Class} of the elements in the buffer (needed to create the array) 312 * @return the elements that were at the head of the buffer which were removed, 313 * or {@code null} if the buffer does not contain {@code n} elements 314 */ 315 @Override 316 public synchronized E[] poll(int n, Class<E> cls) { 317 return super.poll(n, cls); 318 } 319 320 /** 321 * Removes and returns the element at the head of the buffer, 322 * or {@code null} if the buffer is empty. 323 * Blocks while the buffer is empty. 324 * 325 * @return the element that was at the head of the buffer which was removed, 326 * or {@code null} if the buffer is empty 327 */ 328 @Override 329 public synchronized E remove() { 330 while (count<1) { 331 try { 332 wait(); 333 } 334 catch (InterruptedException ex) { 335 } 336 } 337 E r=super.remove(); 338 notifyAll(); 339 return r; 340 } 341 342 /** 343 * Removes and returns up to {@code n} consecutive elements from the head of the buffer 344 * in the order they were added. 345 * Blocks while the buffer does not contain enough elements. 346 * 347 * @param n the number of elements to remove 348 * @param cls the {@code Class} of the elements in the buffer (needed to create the array) 349 * @return the elements that were at the head of the buffer which were removed; 350 * the length of the array reflects the number of elements that were available 351 */ 352 @Override 353 public synchronized E[] remove(int n, Class<E> cls) { 354 while (count<n) { 355 try { 356 wait(); 357 } 358 catch (InterruptedException ex) { 359 } 360 } 361 E[] r=super.remove(n, cls); 362 notifyAll(); 363 return r; 364 } 365 366 /** 367 * Empties the buffer. 368 * 369 * @return {@code true} 370 */ 371 @Override 372 public synchronized boolean removeAll() { 373 return super.removeAll(); 374 } 375 376 /** 377 * Returns the number of elements in the buffer. 378 * 379 * @return the number of elements in the buffer 380 */ 381 @Override 382 public synchronized int size() { 383 return super.size(); 384 } 385 386 /** 387 * Removes {@code n} elements from the head of the buffer (and discards them). 388 * If {@code n}>{@link size()}, only {@link size()} elements are removed. 389 * Does not block if the buffer is empty. 390 * Inspired by and named after similar methods in 391 * {@link java.io.InputStream}s and {@link Reader}s. 392 * 393 * @param n the number of elements to remove 394 * @return the number of elements actually removed 395 */ 396 @Override 397 public synchronized int skip(int n) { 398 return super.skip(n); 399 } 400 }