001/*
002Copyright (c) 2022-2023 Steve Shering
003
004All rights reserved.
005
006As a special exception, the copyright holder of this software gives you permission
007to use this software for personal, not-for-profit purposes.
008
009For any other purpose, a license must be obtained from the copyright holder.
010
011This copyright notice and this permission notice must be included in all copies 
012of this software, including copies of parts of this software.
013
014THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
015IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
016FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
017AUTHOR OR COPYRIGHT HOLDER BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
018LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
019OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
020THE SOFTWARE.
021*/
022package net.sherst.util;
023
024import java.io.Reader;
025import java.util.Collection;
026
027/**
028 * A modified (blocking) thread-safe ring buffer (circular buffer) implementation for Java.
029 * <p>
030 * The {@link add(E)} and {@link addAll(E[])} methods block until there is space in the buffer. 
031 * The {@link remove()} and {@link remove(int, Class)} methods block
032 *  until there are enough elements in the buffer.
033 * The {@link offer(E)} and {@link poll()} methods (and variants) do not block.
034 * <p>
035 * A ring buffer is a fixed-size first in - first out buffer
036 * typically backed by an array with head and tail pointers.
037 * Elements are added to the tail of the buffer and removed from the head.
038 * When the end of the array is reached, the pointers wrap around to the beginning.
039 * When the buffer is full, adding new elements overwrites the oldest elements
040 * (but see the {@link offer(E)} and {@link offer(E[])} methods in this implementation).
041 * <p>
042 * Pros: Fast, simple. 
043 * <p>
044 * Cons: Size fixed at creation time, 
045 * adding new elements overwrites the oldest ones if the buffer is full,
046 * elements can only be added or removed in FIFO order.
047 * <p>
048 * There are some useful look-ahead methods, {@link #peek()}, {@link #peek(int)}, {@link #at(E)},
049 * {@link #at(E[])}, {@link #atSkip(E)} 
050 * and {@link #atSkip(E[])}.
051 * <p>
052 * These are simple, hopefully fast, implementations. 
053 * They deliberately do not implement Java's {@link java.util.Collection} or {@link java.util.Queue} interfaces,
054 * although the API is similar.
055 * <p>
056 * Variants:<ul>
057 * <li>{@link FastRingBuffer}: Simplest, fastest, not thread safe.
058 * <li>{@link SafeRingBuffer}: Thread safe.
059 * <li>{@link BlockingRingBuffer}: add and remove methods block if necessary; offer and poll methods are non-blocking.
060 * </ul>
061 * <p>
062 * {@link #remove()} sets the underlying array slot to <b>null</b> to avoid memory leaks.
063 * All other {@code remove} and {@code poll} methods call {@link #remove()}.  
064 * <p>
065 * {@link #clear()} discards the underlying array to avoid memory leaks. 
066 * {@link #removeAll()} calls {@link #clear()}. 
067 * <p>
068 * Elements can be {@code null}, but this means that,
069 * if {@link #peek()}, {@link #poll()} or {@link #remove()} return null,
070 * this could be because an element was {@code null} or because the buffer was empty.
071 * Consider using {@link #isEmpty()} or {@link #size()} to disambiguate.
072 * 
073 * @param <E> the type of elements in the buffer
074 * @author sherstDotNet@yahoo.com
075 */
076public class BlockingRingBuffer<E> extends net.sherst.util.SafeRingBuffer<E> {
077 
078 /**
079  * Creates a new buffer.
080  * 
081  * @param capacity Maximum capacity of the buffer
082  */
083 public BlockingRingBuffer(int capacity) {
084  super(capacity);
085   }
086 
087 /**
088  * Adds an element to the buffer.
089  * Blocks while the buffer is full.
090  * 
091  * @param e the element to add
092  * @return {@code true}
093  */
094 @Override
095 public synchronized boolean add(E e) {
096    while (count>=capacity) {
097     try {
098      wait();
099      ;}
100      catch (InterruptedException ex) {
101       ;}
102     ;}
103    boolean r=super.add(e);
104    notifyAll();
105    return r;
106    }
107 
108 /**
109  * Adds elements to the buffer, preserving their ordering in the array.
110  * Blocks until there is enough room in the buffer to add all the elements in one go.
111  * 
112  * @param a the elements to add
113  * @return {@code true}
114  */
115 @Override
116 public synchronized boolean addAll(E[] a) {
117    while (count+a.length>capacity) {
118     try {
119      wait();
120      ;}
121      catch (InterruptedException ex) {
122       ;}
123     ;}
124    boolean r=super.addAll(a);
125    notifyAll();
126    return r;
127    }
128 
129 /**
130  * Adds elements to the buffer, preserving their ordering in the collection.
131  * Blocks until there is enough room in the buffer to add all the elements in one go.
132  * 
133  * @param ecol the elements to add
134  * @return {@code true}
135  */
136 @Override
137 public synchronized boolean addAll(Collection<E> c) {
138    while (count+c.size()>capacity) {
139     try {
140      wait();
141      ;}
142      catch (InterruptedException ex) {
143       ;}
144     ;}
145    boolean r=super.addAll(c);
146    notifyAll();
147    return r;
148    }
149 
150 /**
151  * Returns {@code true} if the head of the buffer (the next element that will be removed) 
152  * is equal to {@code o},
153  * compared using {@link java.util.Objects#equals}.
154  * 
155  * @param o the value to be compared
156  * @return {@code true} if the head of the buffer equals {@code o}; 
157  * {@code false} if the buffer is empty
158  */
159 @Override
160 public synchronized boolean at(E o) {
161  return super.at(o);
162  }
163 
164 /**
165  * Returns {@code true} if the elements at the head of the buffer (the next elements that will be removed)
166  * are equal to the contents of {@code a},
167  * compared using {@link java.util.Objects#equals}.
168  * 
169  * @param a the values to be compared
170  * @return {@code true} if the elements at head of the buffer are equal to the elements of {@code a}; 
171  * {@code false} if the buffer doesn't contain enough elements to match.
172  */
173 @Override
174 public synchronized boolean at(E[] a) {
175  return super.at(a);
176  }
177 
178 /**
179  * Compares the head of the buffer (the next element that will be removed) to {@code o}
180  * using {@link java.util.Objects#equals} 
181  * and removes it from the buffer if there is a match. 
182  * 
183  * @param o the value to be compared
184  * @return {@code true} if the head of the buffer equals {@code o} and is removed; 
185  * {@code false} if the buffer is empty
186  */
187 @Override
188 public synchronized boolean atSkip(E o) {
189  return super.atSkip(o);
190  }
191 
192 /**
193  * Compares the head of the buffer (the next elements that will be removed) to {@code a}
194  * using {@link java.util.Objects#equals}
195  * and removes them from the buffer if there is a match.
196  * 
197  * @param a the values to be compared
198  * @return {@code true} if the elements at head of the buffer are equal to the elements of {@code a}; 
199  * {@code false} if the buffer doesn't contain enough elements to match.
200  */
201 @Override
202 public synchronized boolean atSkip(E[] a) {
203  return super.atSkip(a);
204  }
205 
206 /**
207  * Empties the buffer.
208  */
209 @Override
210 public synchronized void clear() {
211  super.clear();
212  }
213 
214 /**
215  * Returns the maximum capacity of this buffer.
216  * 
217  * @return the maximum capacity of this buffer
218  */
219 @Override
220 public int getCapacity() {
221  return super.getCapacity();
222  }
223 
224 /**
225  * Returns {@code true} if the buffer is empty.
226  * 
227  * @return {@code true} if the buffer is empty
228  */
229 @Override
230 public synchronized boolean isEmpty() {
231   return super.isEmpty();
232    }
233 
234 /**
235  * Adds an element to the buffer if there is space for it.  
236  * Does not block.
237  * 
238  * @param e the element to add.
239  * @return {@code true} if there was space in the buffer and the element was added
240  */
241 @Override
242 public synchronized boolean offer(E e) {
243  return super.offer(e);
244   }
245 
246 /**
247  * Adds elements to the buffer if there is space for them.  
248  * Does not block.
249  * 
250  * @param e the element to add.
251  * @return {@code true} if there was space in the buffer and the elements were added
252  */
253 @Override
254 public synchronized boolean offer(E[] a) {
255   return super.offer(a);
256   }
257 
258 /**
259  * Adds elements to the buffer if there is space for them. 
260  * Does not block.
261  * 
262  * @param e the element to add.
263  * @return {@code true} if there was space in the buffer and the elements were added
264  */
265 @Override
266 public synchronized boolean offer(Collection<E> c) {
267   return super.offer(c);
268   }
269  
270 /**
271  * Returns the head of the buffer (the next element that will be removed) without actually removing it.
272  * 
273  * @return the element at the head of the buffer or {@code null} if the buffer is empty
274  */
275 @Override
276 public synchronized E peek() {
277  return super.peek();
278   }
279 
280 /**
281  * Returns the {@code n}<i>th</i> element that will be removed from the buffer, 
282  * counting from 0 ({@code peek(0)} returns the first element that will be removed), 
283  * or {@code null} if the buffer does not contain that many elements.
284  * 
285  * @param n
286  * @return the {@code n}<i>th</i> element that will be removed from the buffer 
287  * or {@code null} if the buffer does not contain that many elements
288  */
289 @Override
290 public synchronized E peek(int n) {
291  return super.peek(n);
292   }
293 
294 /**
295  * Removes and returns the element at the head of the buffer, 
296  * or {@code null} if the buffer is empty.  
297  * 
298  * @return the element that was at the head of the buffer which was removed, 
299  * or {@code null} if the buffer is empty
300  */
301 @Override
302 public synchronized E poll() {
303  return super.poll();
304   }
305 
306 /**
307  * Removes and returns {@code n} consecutive elements from the head of the buffer 
308  * in the order they were added, or {@code null} if the buffer does not contain {@code n} elements. 
309  * 
310  * @param n the number of elements to remove
311  * @param cls the {@code Class} of the elements in the buffer (needed to create the array)
312  * @return the elements that were at the head of the buffer which were removed, 
313  * or {@code null} if the buffer does not contain {@code n} elements
314  */
315 @Override
316 public synchronized E[] poll(int n, Class<E> cls) {
317  return super.poll(n, cls);
318   }
319 
320 /**
321  * Removes and returns the element at the head of the buffer, 
322  * or {@code null} if the buffer is empty.  
323  * Blocks while the buffer is empty.
324  * 
325  * @return the element that was at the head of the buffer which was removed, 
326  * or {@code null} if the buffer is empty
327  */
328 @Override
329 public synchronized E remove() {
330    while (count<1) {
331     try {
332      wait();
333      }
334      catch (InterruptedException ex) {
335       }
336     }
337  E r=super.remove();
338  notifyAll();
339  return r;
340   }
341 
342 /**
343  * Removes and returns up to {@code n} consecutive elements from the head of the buffer 
344  * in the order they were added.
345  * Blocks while the buffer does not contain enough elements.
346  * 
347  * @param n the number of elements to remove
348  * @param cls the {@code Class} of the elements in the buffer (needed to create the array)
349  * @return the elements that were at the head of the buffer which were removed; 
350  * the length of the array reflects the number of elements that were available 
351  */
352 @Override
353 public synchronized E[] remove(int n, Class<E> cls) {
354    while (count<n) {
355     try {
356      wait();
357      }
358      catch (InterruptedException ex) {
359       }
360     }
361  E[] r=super.remove(n, cls);
362  notifyAll();
363  return r;
364   }
365 
366 /**
367  * Empties the buffer.
368  * 
369  * @return {@code true} 
370  */
371 @Override
372 public synchronized boolean removeAll() {
373   return super.removeAll();
374   }
375 
376 /**
377  * Returns the number of elements in the buffer.
378  * 
379  * @return the number of elements in the buffer
380  */
381 @Override
382 public synchronized int size() {
383  return super.size();
384  }
385 
386 /**
387  * Removes {@code n} elements from the head of the buffer (and discards them). 
388  * If {@code n}&gt;{@link size()}, only {@link size()} elements are removed. 
389  * Does not block if the buffer is empty.
390  * Inspired by and named after similar methods in 
391  * {@link java.io.InputStream}s and {@link Reader}s. 
392  * 
393  * @param n the number of elements to remove
394  * @return the number of elements actually removed
395  */
396 @Override
397 public synchronized int skip(int n) {
398    return super.skip(n);
399    }
400  }