Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: port fix discard all message on receiver droped #1121

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crossbeam-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ impl<T> Drop for Sender<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
SenderFlavor::Array(chan) => chan.release(|c| c.disconnect_senders()),
SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
}
Expand Down Expand Up @@ -1159,7 +1159,7 @@ impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect_receivers()),
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
ReceiverFlavor::At(_) => {}
Expand Down
128 changes: 85 additions & 43 deletions crossbeam-channel/src/flavors/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use std::boxed::Box;
use std::cell::UnsafeCell;
use std::mem::{self, MaybeUninit};
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{self, AtomicUsize, Ordering};
use std::time::Instant;
Expand Down Expand Up @@ -476,21 +476,102 @@ impl<T> Channel<T> {
Some(self.cap())
}

/// Disconnects the channel and wakes up all blocked senders and receivers.
/// Disconnects senders and wakes up all blocked senders and receivers.
///
/// Returns `true` if this call disconnected the channel.
pub(crate) fn disconnect(&self) -> bool {
pub(crate) fn disconnect_senders(&self) -> bool {
let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);

if tail & self.mark_bit == 0 {
self.senders.disconnect();
self.receivers.disconnect();
true
} else {
false
}
}

/// Disconnects receivers and wakes up all blocked senders.
///
/// Returns `true` if this call disconnected the channel.
///
/// # Safety
/// May only be called once upon dropping the last receiver. The
/// destruction of all other receivers must have been observed with acquire
/// ordering or stronger.
pub(crate) unsafe fn disconnect_receivers(&self) -> bool {
let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
let disconnected = if tail & self.mark_bit == 0 {
self.senders.disconnect();
true
} else {
false
};

unsafe {
self.discard_all_messages(tail);
}

disconnected
}

/// Discards all messages.
///
/// `tail` should be the current (and therefore last) value of `tail`.
///
/// # Panicking
/// If a destructor panics, the remaining messages are leaked, matching the
/// behaviour of the unbounded channel.
///
/// # Safety
/// This method must only be called when dropping the last receiver. The
/// destruction of all other receivers must have been observed with acquire
/// ordering or stronger.
unsafe fn discard_all_messages(&self, tail: usize) {
debug_assert!(self.is_disconnected());

// Only receivers modify `head`, so since we are the last one,
// this value will not change and will not be observed (since
// no new messages can be sent after disconnection).
let mut head = self.head.load(Ordering::Relaxed);
let tail = tail & !self.mark_bit;

let backoff = Backoff::new();
loop {
// Deconstruct the head.
let index = head & (self.mark_bit - 1);
let lap = head & !(self.one_lap - 1);

// Inspect the corresponding slot.
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);

// If the stamp is ahead of the head by 1, we may drop the message.
if head + 1 == stamp {
head = if index + 1 < self.cap() {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
head + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};

unsafe {
(*slot.msg.get()).assume_init_drop();
}
// If the tail equals the head, that means the channel is empty.
} else if tail == head {
return;
// Otherwise, a sender is about to write into the slot, so we need
// to wait for it to update the stamp.
} else {
backoff.spin();
}
}
}

/// Returns `true` if the channel is disconnected.
pub(crate) fn is_disconnected(&self) -> bool {
self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
Expand Down Expand Up @@ -521,45 +602,6 @@ impl<T> Channel<T> {
}
}

impl<T> Drop for Channel<T> {
fn drop(&mut self) {
if mem::needs_drop::<T>() {
// Get the index of the head.
let head = *self.head.get_mut();
let tail = *self.tail.get_mut();

let hix = head & (self.mark_bit - 1);
let tix = tail & (self.mark_bit - 1);

let len = if hix < tix {
tix - hix
} else if hix > tix {
self.cap() - hix + tix
} else if (tail & !self.mark_bit) == head {
0
} else {
self.cap()
};

// Loop over all slots that hold a message and drop them.
for i in 0..len {
// Compute the index of the next slot holding a message.
let index = if hix + i < self.cap() {
hix + i
} else {
hix + i - self.cap()
};

unsafe {
debug_assert!(index < self.buffer.len());
let slot = self.buffer.get_unchecked_mut(index);
(*slot.msg.get()).assume_init_drop();
}
}
}
}
}

/// Receiver handle to a channel.
pub(crate) struct Receiver<'a, T>(&'a Channel<T>);

Expand Down
12 changes: 12 additions & 0 deletions crossbeam-channel/tests/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,3 +742,15 @@ fn panic_on_drop() {
// Elements after the panicked element will leak.
assert!(!b);
}

#[test]
fn drop_unreceived() {
let (tx, rx) = bounded::<std::rc::Rc<()>>(1);
let msg = std::rc::Rc::new(());
let weak = std::rc::Rc::downgrade(&msg);
assert!(tx.send(msg).is_ok());
drop(rx);
// Messages should be dropped immediately when the last receiver is destroyed.
assert!(weak.upgrade().is_none());
drop(tx);
}
Loading