Std :: thread - reading from file line by line
I would like to read in parallel line by line from an output file. Each thread reads one line, then processes the data. At the same time, the next thread should read the next line.
std::ifstream infile("test.txt");
std::mutex mtx;
void read(int id_thread){
while(infile.good()){
mtx.lock();
std::string sLine;
getline(infile, sLine);
std::cout << "Read by thread: " << id_thread;
std::cout << sLine << std::endl;
mtx.unlock();
}
}
void main(){
std::vector<std::thread> threads;
for(int i = 0; i < num; i++){
threads.push_back(std::thread(parallelFun, i));
}
for(auto& thread : threads){
thread.join();
}
return 0;
}
When I run this code, I get this: The first line reads all lines. How can I make it so that each thread reads one line?
EDIT
As mentioned in the comments, all I had to do was a larger test file. Thanks guys!
source to share
I would change the loop to
while(infile.good()){
mtx.lock();
std::string sLine;
getline(infile, sLine);
mtx.unlock();
std::cout << "Read by thread: " << id_thread;
std::cout << sLine << std::endl;
}
Your std :: cout stuff is the busy part of your test loop that you want to exchange for real code later. This gives a different time to enter. Also, make the test file large . It is not uncommon that the initialization of a thread takes some time, during which the first thread consumes all the data.
source to share
If you want your 5 threads to read exactly every fifth line, you must synchronize the reads, so each thread must know that the previous one has finished reading its part. This requirement potentially imposes huge inefficiency, as some threads can wait a long time for the previous one to execute.
Conceptual code, untested use at your own risk.
Let's first make a default class for handling atomic locks. We align it to avoid false swaps and associated ping pong caching.
constexpr size_t CACHELINESIZE = 64; // could differ on your architecture
template<class dType>
class alignas(CACHELINESIZE) lockstep {
std::atomic<dType> lock = dType(0);
public:
// spinlock spins until the previous value is prev and then tries to set lock to value
// until success, restart the spin if prev changes.
dType Spinlock(dType prev = dType(0), dType next = dType(1)) {
dType expected = prev;
while (!lock.compare_exchange_weak(expected, next)) { // request for locked-exclusiv ~100 cycles?
expected = prev; // we wish to continue to wait for expected
do {
pause(); // on intel waits roughly one L2 latency time.
} while(lock.load(std::memory_order_relaxed) != prev); // only one cache miss per change
}
return expected;
}
void store(dType value) {
lock.store(value);
}
};
lockstep<int> lock { 0 };
constexpr int NoThreads = 5;
std::ifstream infile("test.txt");
void read(int id_thread) {
locks[id_thread].lock = id_thread;
bool izNoGood = false;
int next = id_thread;
while(!izNoGood){
// get lock for next iteration
lock.spinlock(next, next); // wait on our number
// moved file check into locked region
izNoGood = !infile.good();
if (izNoGood) {
lock.store(next+1); // release next thread to end run.
return;
}
std::string sLine;
getline(infile, sLine);
// release next thread
lock.store(next+1);
// do work asynchronous
// ...
// debug log, hopefully the whole line gets written in one go (atomic)
// but can be in "random" order relative to other lines.
std::cout << "Read by thread: " << id_thread << " line no. " << next
<< " text:" << sLine << std::endl; // endl flushes cout, implicit sync?
next += NoThreads; // our next expected line to process
}
}
void main() {
std::vector<std::thread> threads;
for(int i = 0; i < NoThreads; i++) {
threads.push_back(std::thread(parallelFun, i));
}
for(auto& thread : threads){
thread.join();
}
return 0;
}
source to share