समवर्ती डेटा संरचनाओं का सही परीक्षण
एक, दो, तीन, दो
- Rust लाइब्रेरी
loom का उपयोग करके lock-free डेटा संरचनाओं का गहराई से परीक्षण किया जा सकता है
- एक सरल समवर्ती counter का उदाहरण कोड दिया गया है
- कोड में बग यह है कि increment operation atomic नहीं है
use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
#[derive(Default)]
pub struct Counter {
value: AtomicU32,
}
impl Counter {
pub fn increment(&self) {
let value = self.value.load(SeqCst);
self.value.store(value + 1, SeqCst);
}
pub fn get(&self) -> u32 {
self.value.load(SeqCst)
}
}
सरल परीक्षण
- कई threads से उसी counter को बार-बार increment करके परिणाम की जाँच करने वाला test
- test सही तरह से fail होता है, लेकिन timing पर निर्भर होने के कारण इसे दोहराना मुश्किल है
#[test]
fn threaded_test() {
let counter = Counter::default();
let thread_count = 100;
let increment_count = 100;
std::thread::scope(|scope| {
for _ in 0..thread_count {
scope.spawn(|| {
for _ in 0..increment_count {
counter.increment()
}
});
}
});
assert_eq!(counter.get(), thread_count * increment_count);
}
property-based testing (PBT)
- state machine का परीक्षण करने के लिए उपयुक्त property-based testing को लागू करने की कोशिश
- अगर threads को हाथ से step-by-step चलाया जा सके, तो दूसरे thread के load और store के बीच उन्हें आसानी से डाला जा सकता है
#[test]
fn state_machine_test() {
arbtest::arbtest(|rng| {
let mut state: i32 = 0;
let step_count: usize = rng.int_in_range(0..=100)?;
for _ in 0..step_count {
match *rng.choose(&["inc", "dec"])? {
"inc" => state += 1,
"dec" => state -= 1,
_ => unreachable!(),
}
}
Ok(())
});
}
सरल instrumentation
- threads को atomic operations के बीच "pause" करने देने का एक तरीका
pub fn increment(&self) {
pause();
let value = self.value.load(SeqCst);
pause();
self.value.store(value + 1, SeqCst);
pause();
}
fn pause() {
// ¯\\_(ツ)_/¯
}
managed thread API
- API design का एक नियम यह है कि पहले single-user version से शुरुआत करें, API का feel समझें, फिर actual implementation करें
- managed threads का उपयोग करके property-based test लिखना
let counter = Counter::default();
let t1 = managed_thread::spawn(&counter);
let t2 = managed_thread::spawn(&counter);
while !rng.is_empty() {
let coin_flip: bool = rng.arbitrary()?;
if t1.is_paused() {
if coin_flip {
t1.unpause();
}
} else if t2.is_paused() {
if coin_flip {
t2.unpause();
}
}
}
managed thread implementation
- control thread और managed thread के बीच संचार की आवश्यकता
- state को सुरक्षित रखने के लिए mutex और condition variable का उपयोग करके implementation
struct SharedContext {
state: Mutex<State>,
cv: Condvar,
}
#[derive(PartialEq, Eq, Default)]
enum State {
#[default]
Running,
Paused,
}
impl SharedContext {
fn pause(&self) {
let mut guard = self.state.lock().unwrap();
assert_eq!(*guard, State::Running);
*guard = State::Paused;
self.cv.notify_all();
guard = self.cv.wait_while(guard, |state| *state == State::Paused).unwrap();
assert_eq!(*guard, State::Running);
}
}
पूरा code integration
- managed threads और test code का integration
#[test]
fn test_counter() {
arbtest::arbtest(|rng| {
eprintln!("begin trace");
let counter = Counter::default();
let mut counter_model: u32 = 0;
std::thread::scope(|scope| {
let t1 = managed_thread::spawn(scope, &counter);
let t2 = managed_thread::spawn(scope, &counter);
let mut threads = [t1, t2];
while !rng.is_empty() {
for (tid, t) in threads.iter_mut().enumerate() {
if rng.arbitrary()? {
if t.is_paused() {
eprintln!("{tid}: unpause");
t.unpause()
} else {
eprintln!("{tid}: increment");
t.submit(|c| c.increment());
counter_model += 1;
}
}
}
}
for t in threads {
t.join();
}
assert_eq!(counter_model, counter.get());
Ok(())
})
});
}
GN⁺ का सारांश
- यह लेख समवर्ती डेटा संरचनाओं का परीक्षण करने का तरीका समझाता है
- यह Rust की
loom लाइब्रेरी का उपयोग करके non-atomic operations का परीक्षण करने के तरीकों का अध्ययन करता है
- managed threads का उपयोग करके concurrency समस्याओं को reproducible और debuggable तरीके से test किया जाता है
- यह लेख concurrency programming में रुचि रखने वाले developers के लिए उपयोगी होगा
- समान क्षमता वाला एक प्रोजेक्ट Java का
JCStress है
1 टिप्पणियां
Hacker News राय
Rust में Temper नाम की एक लाइब्रेरी विकसित की जा रही है, और Rust memory model के जटिल हिस्सों को संभालने के लिए काफी मेहनत करनी पड़ रही है
Rust में shared-memory atomic snapshot लागू किया गया था, और automated testing को बहुत महत्वपूर्ण माना जाता है
इस approach की कमी यह है कि test code के अनुसार code itself में बदलाव करने पड़ते हैं
JetBrains का Lincheck Kotlin/Java ecosystem में एक अच्छी लाइब्रेरी है
यह जानने की जिज्ञासा है कि क्या C++ के लिए "Loom" जैसी कोई लाइब्रेरी है
इस approach की soft progress guarantees के मामले में सीमाएँ हो सकती हैं
व्यावहारिक ज्ञान की ज़रूरत होती है, और वास्तविक threads बनानी पड़ती हैं
ptrace का उपयोग करके threads को single-step में चलाया जा सकता है ताकि instruction level पर अलग interleavings बनाई जा सकें
Loom का उपयोग करने के लिए conditional compilation का सहारा लेना पड़ता है, जो थोड़ा intrusive है
Python में यही काम कैसे किया जाए, यह जानना चाहते हैं