Skip to content

VecDeque-like fast, unbounded, mpmc/spmc concurent FIFO message queue. Lockless reads, write-lock writes.

License

Apache-2.0, MIT licenses found

Licenses found

Apache-2.0
LICENSE-APACHE
MIT
LICENSE-MIT
Notifications You must be signed in to change notification settings

tower120/rc_event_queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

109 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

crates.io Docs CI

Warning

Project is deprecated in favor of chute.

Chute is a continuation of this project, featuring truly lock-free MPMC writers that are superlinearly faster in highly concurrent scenarios.

Reader counted event queue

Fast, concurrent FIFO event queue (or message queue). Multiple consumers receive every message.

  • mpmc (multi-producer multi-consumer) - lock-free read, locked write.
  • spmc (single-producer multi-consumer) - lock-free read, lock-free write.

Write operations never block read operations. Performance consumer oriented. Mostly contiguous memory layout. Memory consumption does not grow with readers number.

Performance

Have VERY low CPU + memory overhead. Most of the time reader just do 1 atomic load per iter() call. That's all!

Single-threaded.

Read - close to VecDeque! Write:

  • mpmc - push 2x slower then VecDeque. extend with at least 4 items, close to VecDeque.
  • spmc - equal to VecDeque!

Multi-threaded.

Read - per thread performance degrades slowly, with each additional simultaneously reading thread. (Also remember, since rc_event_queue is message queue, and each reader read ALL queue - adding more readers does not consume queue faster)

Write - per thread performance degrades almost linearly, with each additional simultaneously writing thread. (Due to being locked). Not applicable to spmc.

N.B. But if there is no heavy contention - performance very close to single-threaded case.

See mpmc benchmarks.

Principle of operation

See doc/principle-of-operation.md.

Short version - EventQueue operates on the chunk basis. EventQueue does not touch EventReaders . EventReaders always "pull" from EventQueue. The only way EventReader interact with EventQueue - by increasing read counter when switching to next chunk during traverse.

Usage

API doc

use rc_event_queue::prelude::*;
use rc_event_queue::mpmc::{EventQueue, EventReader};

let event = EventQueue::<usize>::new();
let mut reader1 = EventReader::new(event);
let mut reader2 = EventReader::new(event);

event.push(1);
event.push(10);
event.push(100);
event.push(1000);

fn sum (mut iter: impl LendingIterator<ItemValue = usize>) -> usize {
    let mut sum = 0;
    while let Some(item) = iter.next() {
        sum += item;
    }
    sum
}

assert!(sum(reader1.iter()) == 1111);
assert!(sum(reader1.iter()) == 0);
assert!(sum(reader2.iter()) == 1111);
assert!(sum(reader2.iter()) == 0);

event.extend(0..10);
assert!(sum(reader1.iter()) == 55);
assert!(sum(reader2.iter()) == 55);

clear:

event.push(1);
event.push(10);
event.clear();
event.push(100);
event.push(1000);

assert!(sum(reader1.iter()) == 1100);
assert!(sum(reader2.iter()) == 1100);

clear/truncate_front have peculiarities - chunks occupied by readers, will not be freed immediately.

Emergency cut

If any of the readers did not read for a long time - it can retain queue from cleanup. This means that queue capacity will grow. On long runs with unpredictable systems, you may want to periodically check total_capacity, and if it grows too much - you may want to force-cut/clear it.

if event.total_capacity() > 100000{
    // This will not free chunks occupied by readers, but will free the rest.
    // This should be enough, to prevent memory leak, in case if some readers
    // stop consume unexpectedly.
    event.truncate_front(1000);     // leave some of the latest messages to read
    
    // If you set to Settings::MAX_CHUNK_SIZE to high value,
    // this will reduce chunk size.
    event.change_chunk_size(2048);

    // If you DO have access to all readers (you probably don't) - 
    // this will move readers forward, and free the chunks occupied by readers.
    // Under normal conditions, this is not necessary, since readers will jump
    // forward to another chunk immediately on the next iter() call.
    for reader in readers{
        reader.update_position();
        // reader.iter();   // this have same effect as above
    }
}

Even if some reader will stop read forever - you'll only lose/leak chunk directly occupied by reader.

Optimisation

CLEANUP

Set CLEANUP to Never in Settings, in order to postpone chunks deallocations.

use rc_event_reader::mpmc::{EventQueue, EventReader, Settings};

struct S{} impl Settings for S{
    const MIN_CHUNK_SIZE: u32 = 4;
    const MAX_CHUNK_SIZE: u32 = 4096;
    const CLEANUP: CleanupMode = CleanupMode::Never;
}

let event = EventQueue::<usize, S>::new();
let mut reader = event.subscribe();

event.extend(0..10);
sum(reader.iter()); // With CLEANUP != Never, this would cause chunk deallocation

...

event.cleanup();   // Free used chunks

double_buffering

Use double_buffering feature. This will reuse biggest freed chunk. When EventQueue reach its optimal size - chunks will be just swapped, without alloc/dealloc.

Soundness

EventQueue covered with tests. Miri tests. Loom tests. See doc/tests.md

About

VecDeque-like fast, unbounded, mpmc/spmc concurent FIFO message queue. Lockless reads, write-lock writes.

Topics

Resources

License

Apache-2.0, MIT licenses found

Licenses found

Apache-2.0
LICENSE-APACHE
MIT
LICENSE-MIT

Stars

Watchers

Forks

Packages

No packages published

Languages