2007-10-11 Marcus Brinkmann <marcus@g10code.de>
[gpgme.git] / gpgme / kdpipeiodevice.cpp
1 /*
2   Copyright (C) 2007 Klarälvdalens Datakonsult AB
3
4   KDPipeIODevice is free software; you can redistribute it and/or
5   modify it under the terms of the GNU Library General Public
6   License as published by the Free Software Foundation; either
7   version 2 of the License, or (at your option) any later version.
8
9   KDPipeIODevice is distributed in the hope that it will be useful,
10   but WITHOUT ANY WARRANTY; without even the implied warranty of
11   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12   GNU Library General Public License for more details.
13
14   You should have received a copy of the GNU Library General Public License
15   along with KDPipeIODevice; see the file COPYING.LIB.  If not, write to the
16   Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
17   Boston, MA 02110-1301, USA.
18 */
19
20 #include "kdpipeiodevice.h"
21
22 #include <QtCore>
23
24 #include <cassert>
25 #include <memory>
26 #include <algorithm>
27
28 #ifdef Q_OS_WIN32
29 # ifndef NOMINMAX
30 #  define NOMINMAX
31 # endif
32 # include <windows.h>
33 # include <io.h>
34 #else
35 # include <unistd.h>
36 # include <errno.h>
37 #endif
38
39 using namespace _gpgme_;
40
41 #ifndef KDAB_CHECK_THIS
42 # define KDAB_CHECK_CTOR (void)1
43 # define KDAB_CHECK_DTOR KDAB_CHECK_CTOR
44 # define KDAB_CHECK_THIS KDAB_CHECK_CTOR
45 #endif
46
47 #define LOCKED( d ) const QMutexLocker locker( &d->mutex )
48 #define synchronized( d ) if ( int i = 0 ) {} else for ( const QMutexLocker locker( &d->mutex ) ; !i ; ++i )
49
50 const unsigned int BUFFER_SIZE = 4096;
51 const bool ALLOW_QIODEVICE_BUFFERING = true;
52
53 // comment to get trace output:
54 //#define qDebug if(1){}else qDebug
55
56 namespace {
57 class Reader : public QThread {
58     Q_OBJECT
59 public:
60     Reader( int fd, Qt::HANDLE handle );
61     ~Reader();
62
63     qint64 readData( char * data, qint64 maxSize );
64
65     unsigned int bytesInBuffer() const {
66         return ( wptr + sizeof buffer - rptr ) % sizeof buffer ;
67     }
68
69     bool bufferFull() const {
70         return bytesInBuffer() == sizeof buffer - 1;
71     }
72
73     bool bufferEmpty() const {
74         return bytesInBuffer() == 0;
75     }
76
77     bool bufferContains( char ch ) {
78         const unsigned int bib = bytesInBuffer();
79         for ( unsigned int i = rptr ; i < rptr + bib ; ++i )
80             if ( buffer[i%sizeof buffer] == ch )
81                 return true;
82         return false;
83     }
84         
85     void notifyReadyRead();
86
87 Q_SIGNALS:
88     void readyRead();
89
90 protected:
91     /* reimp */ void run();
92
93 private:
94     int fd;
95     Qt::HANDLE handle;
96 public:
97     QMutex mutex;
98     QWaitCondition waitForCancelCondition;
99     QWaitCondition bufferNotFullCondition;
100     QWaitCondition bufferNotEmptyCondition;
101     QWaitCondition hasStarted;
102     QWaitCondition readyReadSentCondition;
103     QWaitCondition blockedConsumerIsDoneCondition;
104     bool cancel;
105     bool eof;
106     bool error;
107     bool eofShortCut;
108     int errorCode;
109     bool consumerBlocksOnUs;
110    
111 private:
112     unsigned int rptr, wptr;
113     char buffer[BUFFER_SIZE+1]; // need to keep one byte free to detect empty state
114 };
115
116
117 Reader::Reader( int fd_, Qt::HANDLE handle_ )
118     : QThread(),
119       fd( fd_ ),
120       handle( handle_ ),
121       mutex(),
122       bufferNotFullCondition(),
123       bufferNotEmptyCondition(),
124       hasStarted(),
125       cancel( false ),
126       eof( false ),
127       error( false ),
128       eofShortCut( false ),
129       errorCode( 0 ),
130       consumerBlocksOnUs( false ),
131       rptr( 0 ), wptr( 0 )
132 {
133     
134 }
135
136 Reader::~Reader() {}
137
138
139 class Writer : public QThread {
140     Q_OBJECT
141 public:
142     Writer( int fd, Qt::HANDLE handle );
143     ~Writer();
144
145     qint64 writeData( const char * data, qint64 size );
146
147     unsigned int bytesInBuffer() const { return numBytesInBuffer; }
148
149     bool bufferFull() const {
150         return numBytesInBuffer == sizeof buffer;
151     }
152
153     bool bufferEmpty() const {
154         return numBytesInBuffer == 0;
155     }
156
157 Q_SIGNALS:
158     void bytesWritten( qint64 );
159
160 protected:
161     /* reimp */ void run();
162
163 private:
164     int fd;
165     Qt::HANDLE handle;
166 public:
167     QMutex mutex;
168     QWaitCondition bufferEmptyCondition;
169     QWaitCondition bufferNotEmptyCondition;
170     QWaitCondition hasStarted;
171     bool cancel;
172     bool error;
173     int errorCode;
174 private:
175     unsigned int numBytesInBuffer;
176     char buffer[BUFFER_SIZE];
177 };
178 }
179
180 Writer::Writer( int fd_, Qt::HANDLE handle_ )
181     : QThread(),
182       fd( fd_ ),
183       handle( handle_ ),
184       mutex(),
185       bufferEmptyCondition(),
186       bufferNotEmptyCondition(),
187       hasStarted(),
188       cancel( false ),
189       error( false ),
190       errorCode( 0 ),
191       numBytesInBuffer( 0 )
192 {
193
194 }
195
196 Writer::~Writer() {}
197
198
199 class KDPipeIODevice::Private : public QObject {
200 Q_OBJECT
201     friend class ::KDPipeIODevice;
202     KDPipeIODevice * const q;
203 public:
204     explicit Private( KDPipeIODevice * qq );
205     ~Private();
206
207     bool doOpen( int, Qt::HANDLE, OpenMode );
208     bool startReaderThread(); 
209     bool startWriterThread(); 
210     void stopThreads();
211
212 public Q_SLOTS:
213     void emitReadyRead();
214  
215 private:
216     int fd;
217     Qt::HANDLE handle;
218     Reader * reader;
219     Writer * writer;
220     bool triedToStartReader;
221     bool triedToStartWriter;
222 };
223
224 KDPipeIODevice::Private::Private( KDPipeIODevice * qq )
225     : QObject( qq ), q( qq ),
226       fd( -1 ),
227       handle( 0 ),
228       reader( 0 ),
229       writer( 0 ),
230       triedToStartReader( false ), triedToStartWriter( false ) 
231 {
232
233 }
234
235 KDPipeIODevice::Private::~Private() {
236     qDebug( "KDPipeIODevice::~Private(): Destroying %p", q );
237 }
238
239 KDPipeIODevice::KDPipeIODevice( QObject * p )
240     : QIODevice( p ), d( new Private( this ) )
241 {
242     KDAB_CHECK_CTOR;
243 }
244
245 KDPipeIODevice::KDPipeIODevice( int fd, OpenMode mode, QObject * p )
246     : QIODevice( p ), d( new Private( this ) )
247 {
248     KDAB_CHECK_CTOR;
249     open( fd, mode );
250 }
251
252 KDPipeIODevice::KDPipeIODevice( Qt::HANDLE handle, OpenMode mode, QObject * p )
253     : QIODevice( p ), d( new Private( this ) )
254 {
255     KDAB_CHECK_CTOR;
256     open( handle, mode );
257 }
258
259 KDPipeIODevice::~KDPipeIODevice() { KDAB_CHECK_DTOR;
260     if ( isOpen() )
261         close();
262     delete d; d = 0;
263 }
264
265
266 bool KDPipeIODevice::open( int fd, OpenMode mode ) { KDAB_CHECK_THIS;
267
268 #ifdef Q_OS_WIN32
269     return d->doOpen( fd, (HANDLE)_get_osfhandle( fd ), mode );
270 #else
271     return d->doOpen( fd, 0, mode );
272 #endif
273
274 }
275
276 bool KDPipeIODevice::open( Qt::HANDLE h, OpenMode mode ) { KDAB_CHECK_THIS;
277
278 #ifdef Q_OS_WIN32
279     return d->doOpen( 0, h, mode );
280 #else
281     Q_UNUSED( h );
282     Q_UNUSED( mode );
283     assert( !"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows." );
284 #endif
285
286 }
287
288 bool KDPipeIODevice::Private::startReaderThread()
289 {
290    if ( triedToStartReader )
291        return true;
292    triedToStartReader = true;    
293    if ( reader && !reader->isRunning() && !reader->isFinished() ) {
294        qDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)" );
295        LOCKED( reader );
296        qDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)" );
297        reader->start( QThread::HighestPriority );
298        qDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)" );
299        const bool hasStarted = reader->hasStarted.wait( &reader->mutex, 1000 );
300        qDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)" );
301
302        return hasStarted;
303    }
304    return true;
305 }
306
307 bool KDPipeIODevice::Private::startWriterThread()
308 {
309    if ( triedToStartWriter )
310        return true;
311    triedToStartWriter = true;    
312    if ( writer && !writer->isRunning() && !writer->isFinished() ) {
313        LOCKED( writer );
314        
315        writer->start( QThread::HighestPriority );
316        if ( !writer->hasStarted.wait( &writer->mutex, 1000 ) )
317             return false;
318    }
319    return true;
320 }
321
322 void KDPipeIODevice::Private::emitReadyRead()
323 {
324     static int s_counter = 0;
325     const int counter = s_counter++;
326     QPointer<Private> thisPointer( this );
327     qDebug( "KDPipeIODevice::Private::emitReadyRead %p, %d", this, counter );
328
329     emit q->readyRead();
330
331     if ( !thisPointer )
332         return;
333     qDebug( "KDPipeIODevice::Private::emitReadyRead %p, %d: locking reader (CONSUMER THREAD)", this, counter );
334     synchronized( reader ) {
335         qDebug( "KDPipeIODevice::Private::emitReadyRead %p, %d: locked reader (CONSUMER THREAD)", this, counter );
336         reader->readyReadSentCondition.wakeAll();
337     }
338     qDebug( "KDPipeIODevice::Private::emitReadyRead %p leaving %d", this, counter );
339
340 }
341
342 bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode_ ) {
343
344     if ( q->isOpen() || fd_ < 0 )
345         return false;
346
347 #ifdef Q_OS_WIN32
348     if ( !handle_ )
349         return false;
350 #endif
351
352     if ( !(mode_ & ReadWrite) )
353         return false; // need to have at least read -or- write
354
355
356     std::auto_ptr<Reader> reader_;
357     std::auto_ptr<Writer> writer_;
358
359     if ( mode_ & ReadOnly ) {
360         reader_.reset( new Reader( fd_, handle_ ) );
361         qDebug( "KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", this, reader_.get(), fd_ ); 
362         connect( reader_.get(), SIGNAL(readyRead()), this, SLOT(emitReadyRead()), 
363 Qt::QueuedConnection );
364     }
365     if ( mode_ & WriteOnly ) {
366         writer_.reset( new Writer( fd_, handle_ ) );
367         qDebug( "KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d", this, writer_.get(), fd_ ); 
368         connect( writer_.get(), SIGNAL(bytesWritten(qint64)), q, SIGNAL(bytesWritten(qint64)), 
369 Qt::QueuedConnection );
370     }
371
372     // commit to *this:
373     fd = fd_;
374     handle = handle_;
375     reader = reader_.release();
376     writer = writer_.release();
377
378     q->setOpenMode( mode_|Unbuffered );
379     return true;
380 }
381
382 int KDPipeIODevice::descriptor() const { KDAB_CHECK_THIS;
383     return d->fd;
384 }
385
386
387 Qt::HANDLE KDPipeIODevice::handle() const { KDAB_CHECK_THIS;
388     return d->handle;
389 }
390
391 qint64 KDPipeIODevice::bytesAvailable() const { KDAB_CHECK_THIS;
392     const qint64 base = QIODevice::bytesAvailable();
393     if ( !d->triedToStartReader ) {
394          d->startReaderThread();
395          return base;
396     }
397     if ( d->reader )
398         synchronized( d->reader ) {
399             const qint64 inBuffer = d->reader->bytesInBuffer();     
400             return base + inBuffer;
401        }
402     return base;
403 }
404
405 qint64 KDPipeIODevice::bytesToWrite() const { KDAB_CHECK_THIS;
406     d->startWriterThread();
407     const qint64 base = QIODevice::bytesToWrite();
408     if ( d->writer )
409         synchronized( d->writer ) return base + d->writer->bytesInBuffer();
410     return base;
411 }
412
413 bool KDPipeIODevice::canReadLine() const { KDAB_CHECK_THIS;
414     d->startReaderThread();
415    if ( QIODevice::canReadLine() )
416         return true;
417     if ( d->reader )
418         synchronized( d->reader ) return d->reader->bufferContains( '\n' );
419     return true;
420 }
421
422 bool KDPipeIODevice::isSequential() const {
423     return true;
424 }
425
426 bool KDPipeIODevice::atEnd() const { KDAB_CHECK_THIS;
427     d->startReaderThread();
428     if ( !QIODevice::atEnd() ) {
429         qDebug( "%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", this, static_cast<long>(bytesAvailable()) );
430         return false;
431     }
432     if ( !isOpen() )
433         return true;
434     if ( d->reader->eofShortCut )
435         return true;
436     LOCKED( d->reader );
437     const bool eof = ( d->reader->error || d->reader->eof ) && d->reader->bufferEmpty();
438     if ( !eof ) {
439         if ( !d->reader->error && !d->reader->eof )
440             qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof", this );
441         if ( !d->reader->bufferEmpty() )
442             qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()", this );
443     }
444     return eof;
445 }
446
447 bool KDPipeIODevice::waitForBytesWritten( int msecs ) { KDAB_CHECK_THIS;
448     d->startWriterThread();
449     Writer * const w = d->writer;
450     if ( !w )
451         return true;
452     LOCKED( w );
453     qDebug( "KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area", this, w 
454 ); 
455     return w->bufferEmpty() || w->error || w->bufferEmptyCondition.wait( &w->mutex, msecs ) ;
456 }
457
458 bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS;
459     qDebug( "KDPipeIODEvice::waitForReadyRead()(%p)", this);
460     d->startReaderThread();
461     if ( ALLOW_QIODEVICE_BUFFERING ) {
462         if ( bytesAvailable() > 0 )
463             return true;
464     }
465     Reader * const r = d->reader;
466     if ( !r || r->eofShortCut )
467         return true;
468     LOCKED( r );
469     if ( r->bytesInBuffer() != 0 || r->eof || r->error )
470         return true;
471     assert( false );
472     return r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) ;
473 }
474
475 template <typename T>
476 class TemporaryValue {
477 public:
478    TemporaryValue( T& var_, const T& tv ) : var( var_ ), oldValue( var_ ) { var = tv; }
479    ~TemporaryValue() { var = oldValue; }
480 private:   
481    T& var;
482    const T oldValue;
483 }; 
484
485
486 bool KDPipeIODevice::readWouldBlock() const
487 {
488    d->startReaderThread();
489    LOCKED( d->reader );
490    return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error;
491 }  
492
493 bool KDPipeIODevice::writeWouldBlock() const
494 {
495    d->startWriterThread();
496    LOCKED( d->writer );
497    return !d->writer->bufferEmpty() && !d->writer->error;
498 }  
499
500
501 qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS;
502     qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", this, data, maxSize );
503     d->startReaderThread();
504     Reader * const r = d->reader;
505
506     assert( r );
507
508
509     //assert( r->isRunning() ); // wrong (might be eof, error)
510     assert( data || maxSize == 0 );
511     assert( maxSize >= 0 );
512
513     if ( r->eofShortCut ) {
514         qDebug( "%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", this );
515         return 0;
516     }
517
518     if ( maxSize < 0 )
519         maxSize = 0;
520
521     if ( ALLOW_QIODEVICE_BUFFERING ) {
522         if ( bytesAvailable() > 0 )
523             maxSize = std::min( maxSize, bytesAvailable() ); // don't block
524     }
525     qDebug( "%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", this );
526     LOCKED( r );
527     qDebug( "%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", this );
528
529     r->readyReadSentCondition.wakeAll();
530     if ( /* maxSize > 0 && */ r->bufferEmpty() &&  !r->error && !r->eof ) { // ### block on maxSize == 0?
531         qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", this );
532         const TemporaryValue<bool> tmp( d->reader->consumerBlocksOnUs, true );
533         r->bufferNotEmptyCondition.wait( &r->mutex );
534         r->blockedConsumerIsDoneCondition.wakeAll();
535         qDebug( "%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)", this ); 
536     }
537
538     if ( r->bufferEmpty() ) {
539         qDebug( "%p: KDPipeIODevice::readData: got empty buffer, signal eof", this );
540         // woken with an empty buffer must mean either EOF or error:
541         assert( r->eof || r->error );
542         r->eofShortCut = true;
543         return r->eof ? 0 : -1 ;
544     }
545
546     qDebug( "%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes", this, maxSize );
547     const qint64 bytesRead = r->readData( data, maxSize );
548     qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", this, bytesRead );
549     qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, d->fd, data );
550  
551     return bytesRead;
552 }
553
554 qint64 Reader::readData( char * data, qint64 maxSize ) {
555     qint64 numRead = rptr < wptr ? wptr - rptr : sizeof buffer - rptr ;
556     if ( numRead > maxSize )
557         numRead = maxSize;
558
559     qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld", this,
560             data, maxSize, rptr, wptr, bytesInBuffer(), numRead );
561
562     std::memcpy( data, buffer + rptr, numRead );
563
564     rptr = ( rptr + numRead ) % sizeof buffer ;
565
566     if ( !bufferFull() ) {
567         qDebug( "%p: KDPipeIODevice::readData: signal bufferNotFullCondition", this );
568         bufferNotFullCondition.wakeAll();
569     }
570
571     return numRead;
572 }
573
574 qint64 KDPipeIODevice::writeData( const char * data, qint64 size ) { KDAB_CHECK_THIS;
575     d->startWriterThread();
576     Writer * const w = d->writer;
577
578     assert( w );
579     assert( w->error || w->isRunning() );
580     assert( data || size == 0 );
581     assert( size >= 0 );
582
583     LOCKED( w );
584
585     while ( !w->error && !w->bufferEmpty() ) { 
586         qDebug( "%p: KDPipeIODevice::writeData: wait for empty buffer", this );
587         w->bufferEmptyCondition.wait( &w->mutex );
588         qDebug( "%p: KDPipeIODevice::writeData: empty buffer signaled", this );
589
590     }
591     if ( w->error )
592         return -1;
593
594     assert( w->bufferEmpty() );
595
596     return w->writeData( data, size );
597 }
598
599 qint64 Writer::writeData( const char * data, qint64 size ) {
600     assert( bufferEmpty() );
601
602     if ( size > static_cast<qint64>( sizeof buffer ) )
603         size = sizeof buffer;
604
605     std::memcpy( buffer, data, size );
606     
607     numBytesInBuffer = size;
608
609     if ( !bufferEmpty() ) {
610         bufferNotEmptyCondition.wakeAll();
611     }
612    return size;
613 }
614
615 void KDPipeIODevice::Private::stopThreads()
616 {
617     if ( triedToStartWriter )
618     {
619         if ( writer && q->bytesToWrite() > 0 )
620             q->waitForBytesWritten( -1 );
621
622         assert( q->bytesToWrite() == 0 );
623     }
624     if ( Reader * & r = reader ) {
625         disconnect( r, SIGNAL( readyRead() ), this, SLOT( emitReadyRead() ) ); 
626         synchronized( r ) {
627             // tell thread to cancel:
628             r->cancel = true;
629             // and wake it, so it can terminate:
630             r->waitForCancelCondition.wakeAll();
631             r->bufferNotFullCondition.wakeAll();
632             r->readyReadSentCondition.wakeAll();
633         }
634     }
635     if ( Writer * & w = writer ) {
636         synchronized( w ) {
637             // tell thread to cancel:
638             w->cancel = true;
639             // and wake it, so it can terminate:
640             w->bufferNotEmptyCondition.wakeAll();
641         }
642     }
643 }
644
645 void KDPipeIODevice::close() { KDAB_CHECK_THIS;
646     qDebug( "KDPipeIODevice::close(%p)", this );
647     if ( !isOpen() )
648         return;
649
650     // tell clients we're about to close:
651     emit aboutToClose();
652     d->stopThreads();
653
654 #define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = 0; delete t2; }
655     qDebug( "KPipeIODevice::close(%p): wait and closing writer %p", this, d->writer );
656     waitAndDelete( d->writer );
657     qDebug( "KPipeIODevice::close(%p): wait and closing reader %p", this, d->reader );
658     if ( d->reader ) {
659         LOCKED( d->reader );
660         d->reader->readyReadSentCondition.wakeAll();
661     }
662     waitAndDelete( d->reader );
663 #undef waitAndDelete
664 #ifdef Q_OS_WIN32
665     qDebug( "Closing handle" );
666     CloseHandle( d->handle );
667 #else
668     ::close( d->fd );
669 #endif
670
671     setOpenMode( NotOpen );
672     d->fd = -1;
673     d->handle = 0;
674 }
675
676 void Reader::run() {
677
678     LOCKED( this );
679
680     // too bad QThread doesn't have that itself; a signal isn't enough
681     hasStarted.wakeAll();
682
683     qDebug( "%p: Reader::run: started", this );
684
685     while ( true ) {
686         if ( !cancel && ( eof || error ) ) {
687             //notify the client until the buffer is empty and then once 
688             //again so he receives eof/error. After that, wait for him 
689             //to cancel 
690             const bool wasEmpty = bufferEmpty();
691             qDebug( "%p: Reader::run: received eof(%d) or error(%d), waking everyone", this, eof, error );
692             notifyReadyRead();
693             if ( !cancel && wasEmpty ) 
694                 waitForCancelCondition.wait( &mutex );
695         } else if ( !cancel && !bufferFull() && !bufferEmpty() ) {
696             qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", this );
697             notifyReadyRead();
698         } 
699  
700         while ( !cancel && !error && bufferFull() ) {
701             notifyReadyRead();
702             if ( bufferFull() ) {
703                 qDebug( "%p: Reader::run: buffer is full, going to sleep", this );
704                 bufferNotFullCondition.wait( &mutex );
705             }
706         }
707         
708         if ( cancel ) {
709             qDebug( "%p: Reader::run: detected cancel", this );
710             goto leave;
711         }
712
713         if ( !eof && !error ) {
714             if ( rptr == wptr ) // optimize for larger chunks in case the buffer is empty
715                 rptr = wptr = 0;
716
717             unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer;
718             if ( numBytes > sizeof buffer - wptr )
719                 numBytes = sizeof buffer - wptr;
720
721             qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", this, rptr, wptr, numBytes );
722
723             assert( numBytes > 0 );
724
725             qDebug( "%p: Reader::run: trying to read %d bytes", this, numBytes );
726 #ifdef Q_OS_WIN32
727             mutex.unlock();
728             DWORD numRead;
729             const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 );
730             mutex.lock();
731             if ( ok ) {
732                 if ( numRead == 0 ) {
733                     qDebug( "%p: Reader::run: got eof (numRead==0)", this );
734                     eof = true;
735                 } 
736             } else { // !ok
737                 errorCode = static_cast<int>( GetLastError() );
738                 if ( errorCode == ERROR_BROKEN_PIPE ) {
739                     assert( numRead == 0 );
740                     qDebug( "%p: Reader::run: got eof (broken pipe)", this );
741                     eof = true;
742                 } else {
743                     assert( numRead == 0 );
744                     qDebug( "%p: Reader::run: got error: %s (%d)", this, strerror( errorCode ), errorCode );
745                     error = true;
746                 }
747             }
748 #else
749             qint64 numRead;
750             mutex.unlock();
751             do {
752                 numRead = ::read( fd, buffer + wptr, numBytes );
753             } while ( numRead == -1 && errno == EINTR );
754             mutex.lock();
755
756             if ( numRead < 0 ) {
757                 errorCode = errno;
758                 error = true;
759                 qDebug( "%p: Reader::run: got error: %d", this, errorCode );
760             } else if ( numRead == 0 ) {
761                 qDebug( "%p: Reader::run: eof detected", this );  
762                 eof = true;
763             }
764 #endif
765             qDebug( "%p: Reader::run: read %ld bytes", this, static_cast<long>(numRead) );
766             qDebug( "%p: Reader::run(fd=%d): %s", this, fd, buffer );
767
768             if ( numRead > 0 ) {
769                 qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", this, rptr, wptr );
770                 wptr = ( wptr + numRead ) % sizeof buffer;
771                 qDebug( "%p: Reader::run: buffer after:  rptr=%4d, wptr=%4d", this, rptr, wptr );
772             }
773         }
774     }
775  leave:
776     qDebug( "%p: Reader::run: terminated", this );
777 }
778
779 void Reader::notifyReadyRead()
780 {
781     qDebug( "notifyReadyRead: %d bytes available", bytesInBuffer() );
782     assert( !cancel );
783
784     if ( consumerBlocksOnUs ) {
785         bufferNotEmptyCondition.wakeAll();
786         blockedConsumerIsDoneCondition.wait( &mutex );
787         return;
788     }
789     qDebug( "notifyReadyRead: emit signal" );
790     emit readyRead();
791     readyReadSentCondition.wait( &mutex );
792     qDebug( "notifyReadyRead: returning from waiting, leave" );
793 }
794
795 void Writer::run() {
796
797     LOCKED( this );
798
799     // too bad QThread doesn't have that itself; a signal isn't enough
800     hasStarted.wakeAll();
801
802     qDebug( "%p: Writer::run: started", this );
803
804     while ( true ) {
805
806         while ( !cancel && bufferEmpty() ) {
807             qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
808             bufferEmptyCondition.wakeAll();
809             emit bytesWritten( 0 );
810             qDebug( "%p: Writer::run: buffer is empty, going to sleep", this );
811             bufferNotEmptyCondition.wait( &mutex );
812             qDebug( "%p: Writer::run: woke up", this );
813         }
814
815         if ( cancel ) {
816             qDebug( "%p: Writer::run: detected cancel", this );
817             goto leave;
818         }
819
820         assert( numBytesInBuffer > 0 );
821
822         qDebug( "%p: Writer::run: Trying to write %u bytes", this, numBytesInBuffer );
823         qint64 totalWritten = 0;
824         do {  
825             mutex.unlock();
826 #ifdef Q_OS_WIN32
827             DWORD numWritten;
828             qDebug( "%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%lld): %s:", this, fd, numBytesInBuffer, buffer ); 
829             qDebug( "%p (fd=%d): Writer::run: Going into WriteFile", this, fd );
830             if ( !WriteFile( handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, 0 ) ) {
831                 mutex.lock();
832                 errorCode = static_cast<int>( GetLastError() );
833                 qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
834                 error = true;
835                 goto leave;
836             }
837 #else
838             qint64 numWritten;
839             do {
840                 numWritten = ::write( fd, buffer + totalWritten, numBytesInBuffer - totalWritten );
841             } while ( numWritten == -1 && errno == EINTR );
842
843             if ( numWritten < 0 ) {
844                 mutex.lock();
845                 errorCode = errno;
846                 qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
847                 error = true;
848                 goto leave;
849             }
850 #endif
851             qDebug( "%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", this, fd, numBytesInBuffer, buffer );
852             totalWritten += numWritten;
853             mutex.lock();
854         } while ( totalWritten < numBytesInBuffer );
855
856         qDebug( "%p: Writer::run: wrote %lld bytes", this, totalWritten );
857
858         numBytesInBuffer = 0;
859
860         qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
861         bufferEmptyCondition.wakeAll();
862         emit bytesWritten( totalWritten );
863     }
864  leave:
865     qDebug( "%p: Writer::run: terminating", this );
866     numBytesInBuffer = 0;
867     qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
868     bufferEmptyCondition.wakeAll();
869     emit bytesWritten( 0 );
870 }
871
872 // static 
873 std::pair<KDPipeIODevice*,KDPipeIODevice*> KDPipeIODevice::makePairOfConnectedPipes() {
874     KDPipeIODevice * read = 0;
875     KDPipeIODevice * write = 0;
876 #ifdef Q_OS_WIN32
877     HANDLE rh;
878     HANDLE wh;
879     SECURITY_ATTRIBUTES sa;
880     memset( &sa, 0, sizeof(sa) );
881     sa.nLength = sizeof(sa);
882     sa.bInheritHandle = TRUE;
883         if ( CreatePipe( &rh, &wh, &sa, BUFFER_SIZE ) ) {
884         read = new KDPipeIODevice;
885         read->open( rh, ReadOnly );
886         write = new KDPipeIODevice;
887         write->open( wh, WriteOnly );
888     }
889 #else
890     int fds[2];
891     if ( pipe( fds ) == 0 ) {
892         read = new KDPipeIODevice;
893         read->open( fds[0], ReadOnly );
894         write = new KDPipeIODevice;
895         write->open( fds[1], WriteOnly );
896     }
897 #endif
898     return std::make_pair( read, write );
899 }
900
901 #ifdef KDAB_DEFINE_CHECKS
902 KDAB_DEFINE_CHECKS( KDPipeIODevice ) {
903     if ( !isOpen() ) {
904         assert( openMode() == NotOpen );
905         assert( !d->reader );
906         assert( !d->writer );
907 #ifdef Q_OS_WIN32
908         assert( !d->handle );
909 #else
910         assert( d->fd < 0 );
911 #endif
912     } else {
913         assert( openMode() != NotOpen );
914         assert( openMode() & ReadWrite );
915         if ( openMode() & ReadOnly ) {
916             assert( d->reader );
917             synchronized( d->reader )
918                 assert( d->reader->eof || d->reader->error || d->reader->isRunning() );
919         }
920         if ( openMode() & WriteOnly ) {
921             assert( d->writer );
922             synchronized( d->writer )
923                 assert( d->writer->error || d->writer->isRunning() );
924         }
925 #ifdef Q_OS_WIN32
926         assert( d->handle );
927 #else
928         assert( d->fd >= 0 );
929 #endif
930     }
931 }
932 #endif // KDAB_DEFINE_CHECKS
933
934 #include "moc_kdpipeiodevice.cpp"
935 #include "kdpipeiodevice.moc"