Enable FD passing and thus building of the UI-server.
[gpgme.git] / src / kdpipeiodevice.cpp
1 /*
2   Copyright (C) 2007 Klarälvdalens Datakonsult AB
3
4   KDPipeIODevice is free software; you can redistribute it and/or
5   modify it under the terms of the GNU Library General Public
6   License as published by the Free Software Foundation; either
7   version 2 of the License, or (at your option) any later version.
8
9   KDPipeIODevice is distributed in the hope that it will be useful,
10   but WITHOUT ANY WARRANTY; without even the implied warranty of
11   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12   GNU Library General Public License for more details.
13
14   You should have received a copy of the GNU Library General Public License
15   along with KDPipeIODevice; see the file COPYING.LIB.  If not, write to the
16   Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
17   Boston, MA 02110-1301, USA.
18 */
19
20 #include "kdpipeiodevice.h"
21
22 #include <QtCore>
23
24 #include <cassert>
25 #include <memory>
26 #include <algorithm>
27
28 #ifdef Q_OS_WIN32
29 # ifndef NOMINMAX
30 #  define NOMINMAX
31 # endif
32 # include <windows.h>
33 # include <io.h>
34 #else
35 # include <unistd.h>
36 # include <errno.h>
37 #endif
38
39 using namespace _gpgme_;
40
41 #ifndef KDAB_CHECK_THIS
42 # define KDAB_CHECK_CTOR (void)1
43 # define KDAB_CHECK_DTOR KDAB_CHECK_CTOR
44 # define KDAB_CHECK_THIS KDAB_CHECK_CTOR
45 #endif
46
47 #define LOCKED( d ) const QMutexLocker locker( &d->mutex )
48 #define synchronized( d ) if ( int i = 0 ) {} else for ( const QMutexLocker locker( &d->mutex ) ; !i ; ++i )
49
50 const unsigned int BUFFER_SIZE = 4096;
51 const bool ALLOW_QIODEVICE_BUFFERING = true;
52
53 // comment to get trace output:
54 //#define qDebug if(1){}else qDebug
55
56 namespace {
57 class Reader : public QThread {
58     Q_OBJECT
59 public:
60     Reader( int fd, Qt::HANDLE handle );
61     ~Reader();
62
63     qint64 readData( char * data, qint64 maxSize );
64
65     unsigned int bytesInBuffer() const {
66         return ( wptr + sizeof buffer - rptr ) % sizeof buffer ;
67     }
68
69     bool bufferFull() const {
70         return bytesInBuffer() == sizeof buffer - 1;
71     }
72
73     bool bufferEmpty() const {
74         return bytesInBuffer() == 0;
75     }
76
77     bool bufferContains( char ch ) {
78         const unsigned int bib = bytesInBuffer();
79         for ( unsigned int i = rptr ; i < rptr + bib ; ++i )
80             if ( buffer[i%sizeof buffer] == ch )
81                 return true;
82         return false;
83     }
84         
85     void notifyReadyRead();
86
87 Q_SIGNALS:
88     void readyRead();
89
90 protected:
91     /* reimp */ void run();
92
93 private:
94     int fd;
95     Qt::HANDLE handle;
96 public:
97     QMutex mutex;
98     QWaitCondition waitForCancelCondition;
99     QWaitCondition bufferNotFullCondition;
100     QWaitCondition bufferNotEmptyCondition;
101     QWaitCondition hasStarted;
102     QWaitCondition readyReadSentCondition;
103     QWaitCondition blockedConsumerIsDoneCondition;
104     bool cancel;
105     bool eof;
106     bool error;
107     bool eofShortCut;
108     int errorCode;
109     bool isReading;
110     bool consumerBlocksOnUs;
111    
112 private:
113     unsigned int rptr, wptr;
114     char buffer[BUFFER_SIZE+1]; // need to keep one byte free to detect empty state
115 };
116
117
118 Reader::Reader( int fd_, Qt::HANDLE handle_ )
119     : QThread(),
120       fd( fd_ ),
121       handle( handle_ ),
122       mutex(),
123       bufferNotFullCondition(),
124       bufferNotEmptyCondition(),
125       hasStarted(),
126       cancel( false ),
127       eof( false ),
128       error( false ),
129       eofShortCut( false ),
130       errorCode( 0 ),
131       isReading( false ),
132       consumerBlocksOnUs( false ),
133       rptr( 0 ), wptr( 0 )
134 {
135     
136 }
137
138 Reader::~Reader() {}
139
140
141 class Writer : public QThread {
142     Q_OBJECT
143 public:
144     Writer( int fd, Qt::HANDLE handle );
145     ~Writer();
146
147     qint64 writeData( const char * data, qint64 size );
148
149     unsigned int bytesInBuffer() const { return numBytesInBuffer; }
150
151     bool bufferFull() const {
152         return numBytesInBuffer == sizeof buffer;
153     }
154
155     bool bufferEmpty() const {
156         return numBytesInBuffer == 0;
157     }
158
159 Q_SIGNALS:
160     void bytesWritten( qint64 );
161
162 protected:
163     /* reimp */ void run();
164
165 private:
166     int fd;
167     Qt::HANDLE handle;
168 public:
169     QMutex mutex;
170     QWaitCondition bufferEmptyCondition;
171     QWaitCondition bufferNotEmptyCondition;
172     QWaitCondition hasStarted;
173     bool cancel;
174     bool error;
175     int errorCode;
176 private:
177     unsigned int numBytesInBuffer;
178     char buffer[BUFFER_SIZE];
179 };
180 }
181
182 Writer::Writer( int fd_, Qt::HANDLE handle_ )
183     : QThread(),
184       fd( fd_ ),
185       handle( handle_ ),
186       mutex(),
187       bufferEmptyCondition(),
188       bufferNotEmptyCondition(),
189       hasStarted(),
190       cancel( false ),
191       error( false ),
192       errorCode( 0 ),
193       numBytesInBuffer( 0 )
194 {
195
196 }
197
198 Writer::~Writer() {}
199
200
201 class KDPipeIODevice::Private : public QObject {
202 Q_OBJECT
203     friend class ::KDPipeIODevice;
204     KDPipeIODevice * const q;
205 public:
206     explicit Private( KDPipeIODevice * qq );
207     ~Private();
208
209     bool doOpen( int, Qt::HANDLE, OpenMode );
210     bool startReaderThread(); 
211     bool startWriterThread(); 
212     void stopThreads();
213
214 public Q_SLOTS:
215     void emitReadyRead();
216  
217 private:
218     int fd;
219     Qt::HANDLE handle;
220     Reader * reader;
221     Writer * writer;
222     bool triedToStartReader;
223     bool triedToStartWriter;
224 };
225
226 KDPipeIODevice::Private::Private( KDPipeIODevice * qq )
227     : QObject( qq ), q( qq ),
228       fd( -1 ),
229       handle( 0 ),
230       reader( 0 ),
231       writer( 0 ),
232       triedToStartReader( false ), triedToStartWriter( false ) 
233 {
234
235 }
236
237 KDPipeIODevice::Private::~Private() {
238     qDebug( "KDPipeIODevice::~Private(): Destroying %p", q );
239 }
240
241 KDPipeIODevice::KDPipeIODevice( QObject * p )
242     : QIODevice( p ), d( new Private( this ) )
243 {
244     KDAB_CHECK_CTOR;
245 }
246
247 KDPipeIODevice::KDPipeIODevice( int fd, OpenMode mode, QObject * p )
248     : QIODevice( p ), d( new Private( this ) )
249 {
250     KDAB_CHECK_CTOR;
251     open( fd, mode );
252 }
253
254 KDPipeIODevice::KDPipeIODevice( Qt::HANDLE handle, OpenMode mode, QObject * p )
255     : QIODevice( p ), d( new Private( this ) )
256 {
257     KDAB_CHECK_CTOR;
258     open( handle, mode );
259 }
260
261 KDPipeIODevice::~KDPipeIODevice() { KDAB_CHECK_DTOR;
262     if ( isOpen() )
263         close();
264     delete d; d = 0;
265 }
266
267
268 bool KDPipeIODevice::open( int fd, OpenMode mode ) { KDAB_CHECK_THIS;
269
270 #ifdef Q_OS_WIN32
271     return d->doOpen( fd, (HANDLE)_get_osfhandle( fd ), mode );
272 #else
273     return d->doOpen( fd, 0, mode );
274 #endif
275
276 }
277
278 bool KDPipeIODevice::open( Qt::HANDLE h, OpenMode mode ) { KDAB_CHECK_THIS;
279
280 #ifdef Q_OS_WIN32
281     return d->doOpen( -1, h, mode );
282 #else
283     Q_UNUSED( h );
284     Q_UNUSED( mode );
285     assert( !"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows." );
286 #endif
287
288 }
289
290 bool KDPipeIODevice::Private::startReaderThread()
291 {
292    if ( triedToStartReader )
293        return true;
294    triedToStartReader = true;    
295    if ( reader && !reader->isRunning() && !reader->isFinished() ) {
296        qDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)" );
297        LOCKED( reader );
298        qDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)" );
299        reader->start( QThread::HighestPriority );
300        qDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)" );
301        const bool hasStarted = reader->hasStarted.wait( &reader->mutex, 1000 );
302        qDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)" );
303
304        return hasStarted;
305    }
306    return true;
307 }
308
309 bool KDPipeIODevice::Private::startWriterThread()
310 {
311    if ( triedToStartWriter )
312        return true;
313    triedToStartWriter = true;    
314    if ( writer && !writer->isRunning() && !writer->isFinished() ) {
315        LOCKED( writer );
316        
317        writer->start( QThread::HighestPriority );
318        if ( !writer->hasStarted.wait( &writer->mutex, 1000 ) )
319             return false;
320    }
321    return true;
322 }
323
324 void KDPipeIODevice::Private::emitReadyRead()
325 {
326     QPointer<Private> thisPointer( this );
327     qDebug( "KDPipeIODevice::Private::emitReadyRead %p", this );
328
329     emit q->readyRead();
330
331     if ( !thisPointer )
332         return;
333
334     bool mustNotify = false;
335
336     if ( reader ) {
337         qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locking reader (CONSUMER THREAD)", this );
338         synchronized( reader ) {
339             qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locked reader (CONSUMER THREAD)", this );
340             reader->readyReadSentCondition.wakeAll();
341             mustNotify = !reader->bufferEmpty() && reader->isReading;
342             qDebug( "KDPipeIODevice::emitReadyRead %p: bufferEmpty: %d reader in ReadFile: %d", this, reader->bufferEmpty(), reader->isReading );
343         }
344     }
345     if ( mustNotify )
346         QTimer::singleShot( 100, this, SLOT( emitReadyRead() ) );  
347     qDebug( "KDPipeIODevice::Private::emitReadyRead %p leaving", this );
348
349 }
350
351 bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode_ ) {
352
353     if ( q->isOpen() )
354         return false;
355
356 #ifdef Q_OS_WIN32
357     if ( !handle_ )
358         return false;
359 #else
360     if ( fd_ < 0 )
361         return false;
362 #endif
363
364     if ( !(mode_ & ReadWrite) )
365         return false; // need to have at least read -or- write
366
367
368     std::auto_ptr<Reader> reader_;
369     std::auto_ptr<Writer> writer_;
370
371     if ( mode_ & ReadOnly ) {
372         reader_.reset( new Reader( fd_, handle_ ) );
373         qDebug( "KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", this, reader_.get(), fd_ ); 
374         connect( reader_.get(), SIGNAL(readyRead()), this, SLOT(emitReadyRead()), 
375 Qt::QueuedConnection );
376     }
377     if ( mode_ & WriteOnly ) {
378         writer_.reset( new Writer( fd_, handle_ ) );
379         qDebug( "KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d", this, writer_.get(), fd_ ); 
380         connect( writer_.get(), SIGNAL(bytesWritten(qint64)), q, SIGNAL(bytesWritten(qint64)), 
381 Qt::QueuedConnection );
382     }
383
384     // commit to *this:
385     fd = fd_;
386     handle = handle_;
387     reader = reader_.release();
388     writer = writer_.release();
389
390     q->setOpenMode( mode_|Unbuffered );
391     return true;
392 }
393
394 int KDPipeIODevice::descriptor() const { KDAB_CHECK_THIS;
395     return d->fd;
396 }
397
398
399 Qt::HANDLE KDPipeIODevice::handle() const { KDAB_CHECK_THIS;
400     return d->handle;
401 }
402
403 qint64 KDPipeIODevice::bytesAvailable() const { KDAB_CHECK_THIS;
404     const qint64 base = QIODevice::bytesAvailable();
405     if ( !d->triedToStartReader ) {
406          d->startReaderThread();
407          return base;
408     }
409     if ( d->reader )
410         synchronized( d->reader ) {
411             const qint64 inBuffer = d->reader->bytesInBuffer();     
412             return base + inBuffer;
413        }
414     return base;
415 }
416
417 qint64 KDPipeIODevice::bytesToWrite() const { KDAB_CHECK_THIS;
418     d->startWriterThread();
419     const qint64 base = QIODevice::bytesToWrite();
420     if ( d->writer )
421         synchronized( d->writer ) return base + d->writer->bytesInBuffer();
422     return base;
423 }
424
425 bool KDPipeIODevice::canReadLine() const { KDAB_CHECK_THIS;
426     d->startReaderThread();
427    if ( QIODevice::canReadLine() )
428         return true;
429     if ( d->reader )
430         synchronized( d->reader ) return d->reader->bufferContains( '\n' );
431     return true;
432 }
433
434 bool KDPipeIODevice::isSequential() const {
435     return true;
436 }
437
438 bool KDPipeIODevice::atEnd() const { KDAB_CHECK_THIS;
439     d->startReaderThread();
440     if ( !QIODevice::atEnd() ) {
441         qDebug( "%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", this, static_cast<long>(bytesAvailable()) );
442         return false;
443     }
444     if ( !isOpen() )
445         return true;
446     if ( d->reader->eofShortCut )
447         return true;
448     LOCKED( d->reader );
449     const bool eof = ( d->reader->error || d->reader->eof ) && d->reader->bufferEmpty();
450     if ( !eof ) {
451         if ( !d->reader->error && !d->reader->eof )
452             qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof", this );
453         if ( !d->reader->bufferEmpty() )
454             qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()", this );
455     }
456     return eof;
457 }
458
459 bool KDPipeIODevice::waitForBytesWritten( int msecs ) { KDAB_CHECK_THIS;
460     d->startWriterThread();
461     Writer * const w = d->writer;
462     if ( !w )
463         return true;
464     LOCKED( w );
465     qDebug( "KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area", this, w 
466 ); 
467     return w->bufferEmpty() || w->error || w->bufferEmptyCondition.wait( &w->mutex, msecs ) ;
468 }
469
470 bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS;
471     qDebug( "KDPipeIODEvice::waitForReadyRead()(%p)", this);
472     d->startReaderThread();
473     if ( ALLOW_QIODEVICE_BUFFERING ) {
474         if ( bytesAvailable() > 0 )
475             return true;
476     }
477     Reader * const r = d->reader;
478     if ( !r || r->eofShortCut )
479         return true;
480     LOCKED( r );
481     if ( r->bytesInBuffer() != 0 || r->eof || r->error )
482         return true;
483
484     return msecs >= 0 ? r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) : r->bufferNotEmptyCondition.wait( &r->mutex );
485 }
486
487 template <typename T>
488 class TemporaryValue {
489 public:
490    TemporaryValue( T& var_, const T& tv ) : var( var_ ), oldValue( var_ ) { var = tv; }
491    ~TemporaryValue() { var = oldValue; }
492 private:   
493    T& var;
494    const T oldValue;
495 }; 
496
497
498 bool KDPipeIODevice::readWouldBlock() const
499 {
500    d->startReaderThread();
501    LOCKED( d->reader );
502    return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error;
503 }  
504
505 bool KDPipeIODevice::writeWouldBlock() const
506 {
507    d->startWriterThread();
508    LOCKED( d->writer );
509    return !d->writer->bufferEmpty() && !d->writer->error;
510 }  
511
512
513 qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS;
514     qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", this, data, maxSize );
515     d->startReaderThread();
516     Reader * const r = d->reader;
517
518     assert( r );
519
520
521     //assert( r->isRunning() ); // wrong (might be eof, error)
522     assert( data || maxSize == 0 );
523     assert( maxSize >= 0 );
524
525     if ( r->eofShortCut ) {
526         qDebug( "%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", this );
527         return 0;
528     }
529
530     if ( maxSize < 0 )
531         maxSize = 0;
532
533     if ( ALLOW_QIODEVICE_BUFFERING ) {
534         if ( bytesAvailable() > 0 )
535             maxSize = std::min( maxSize, bytesAvailable() ); // don't block
536     }
537     qDebug( "%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", this );
538     LOCKED( r );
539     qDebug( "%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", this );
540
541     r->readyReadSentCondition.wakeAll();
542     if ( /* maxSize > 0 && */ r->bufferEmpty() &&  !r->error && !r->eof ) { // ### block on maxSize == 0?
543         qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", this );
544         const TemporaryValue<bool> tmp( d->reader->consumerBlocksOnUs, true );
545         r->bufferNotEmptyCondition.wait( &r->mutex );
546         r->blockedConsumerIsDoneCondition.wakeAll();
547         qDebug( "%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)", this ); 
548     }
549
550     if ( r->bufferEmpty() ) {
551         qDebug( "%p: KDPipeIODevice::readData: got empty buffer, signal eof", this );
552         // woken with an empty buffer must mean either EOF or error:
553         assert( r->eof || r->error );
554         r->eofShortCut = true;
555         return r->eof ? 0 : -1 ;
556     }
557
558     qDebug( "%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes", this, maxSize );
559     const qint64 bytesRead = r->readData( data, maxSize );
560     qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", this, bytesRead );
561     qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", this, d->fd, data );
562  
563     return bytesRead;
564 }
565
566 qint64 Reader::readData( char * data, qint64 maxSize ) {
567     qint64 numRead = rptr < wptr ? wptr - rptr : sizeof buffer - rptr ;
568     if ( numRead > maxSize )
569         numRead = maxSize;
570
571     qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld", this,
572             data, maxSize, rptr, wptr, bytesInBuffer(), numRead );
573
574     std::memcpy( data, buffer + rptr, numRead );
575
576     rptr = ( rptr + numRead ) % sizeof buffer ;
577
578     if ( !bufferFull() ) {
579         qDebug( "%p: KDPipeIODevice::readData: signal bufferNotFullCondition", this );
580         bufferNotFullCondition.wakeAll();
581     }
582
583     return numRead;
584 }
585
586 qint64 KDPipeIODevice::writeData( const char * data, qint64 size ) { KDAB_CHECK_THIS;
587     d->startWriterThread();
588     Writer * const w = d->writer;
589
590     assert( w );
591     assert( w->error || w->isRunning() );
592     assert( data || size == 0 );
593     assert( size >= 0 );
594
595     LOCKED( w );
596
597     while ( !w->error && !w->bufferEmpty() ) { 
598         qDebug( "%p: KDPipeIODevice::writeData: wait for empty buffer", this );
599         w->bufferEmptyCondition.wait( &w->mutex );
600         qDebug( "%p: KDPipeIODevice::writeData: empty buffer signaled", this );
601
602     }
603     if ( w->error )
604         return -1;
605
606     assert( w->bufferEmpty() );
607
608     return w->writeData( data, size );
609 }
610
611 qint64 Writer::writeData( const char * data, qint64 size ) {
612     assert( bufferEmpty() );
613
614     if ( size > static_cast<qint64>( sizeof buffer ) )
615         size = sizeof buffer;
616
617     std::memcpy( buffer, data, size );
618     
619     numBytesInBuffer = size;
620
621     if ( !bufferEmpty() ) {
622         bufferNotEmptyCondition.wakeAll();
623     }
624    return size;
625 }
626
627 void KDPipeIODevice::Private::stopThreads()
628 {
629     if ( triedToStartWriter )
630     {
631         if ( writer && q->bytesToWrite() > 0 )
632             q->waitForBytesWritten( -1 );
633
634         assert( q->bytesToWrite() == 0 );
635     }
636     if ( Reader * & r = reader ) {
637         disconnect( r, SIGNAL( readyRead() ), this, SLOT( emitReadyRead() ) ); 
638         synchronized( r ) {
639             // tell thread to cancel:
640             r->cancel = true;
641             // and wake it, so it can terminate:
642             r->waitForCancelCondition.wakeAll();
643             r->bufferNotFullCondition.wakeAll();
644             r->readyReadSentCondition.wakeAll();
645         }
646     }
647     if ( Writer * & w = writer ) {
648         synchronized( w ) {
649             // tell thread to cancel:
650             w->cancel = true;
651             // and wake it, so it can terminate:
652             w->bufferNotEmptyCondition.wakeAll();
653         }
654     }
655 }
656
657 void KDPipeIODevice::close() { KDAB_CHECK_THIS;
658     qDebug( "KDPipeIODevice::close(%p)", this );
659     if ( !isOpen() )
660         return;
661
662     // tell clients we're about to close:
663     emit aboutToClose();
664     d->stopThreads();
665
666 #define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = 0; delete t2; }
667     qDebug( "KPipeIODevice::close(%p): wait and closing writer %p", this, d->writer );
668     waitAndDelete( d->writer );
669     qDebug( "KPipeIODevice::close(%p): wait and closing reader %p", this, d->reader );
670     if ( d->reader ) {
671         LOCKED( d->reader );
672         d->reader->readyReadSentCondition.wakeAll();
673     }
674     waitAndDelete( d->reader );
675 #undef waitAndDelete
676 #ifdef Q_OS_WIN32
677     if ( d->fd != -1 )
678         _close( d->fd );
679     else
680         CloseHandle( d->handle );
681 #else
682     ::close( d->fd );
683 #endif
684
685     setOpenMode( NotOpen );
686     d->fd = -1;
687     d->handle = 0;
688 }
689
690 void Reader::run() {
691
692     LOCKED( this );
693
694     // too bad QThread doesn't have that itself; a signal isn't enough
695     hasStarted.wakeAll();
696
697     qDebug( "%p: Reader::run: started", this );
698
699     while ( true ) {
700         if ( !cancel && ( eof || error ) ) {
701             //notify the client until the buffer is empty and then once 
702             //again so he receives eof/error. After that, wait for him 
703             //to cancel 
704             const bool wasEmpty = bufferEmpty();
705             qDebug( "%p: Reader::run: received eof(%d) or error(%d), waking everyone", this, eof, error );
706             notifyReadyRead();
707             if ( !cancel && wasEmpty ) 
708                 waitForCancelCondition.wait( &mutex );
709         } else if ( !cancel && !bufferFull() && !bufferEmpty() ) {
710             qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", this );
711             notifyReadyRead();
712         } 
713  
714         while ( !cancel && !error && bufferFull() ) {
715             notifyReadyRead();
716             if ( !cancel && bufferFull() ) {
717                 qDebug( "%p: Reader::run: buffer is full, going to sleep", this );
718                 bufferNotFullCondition.wait( &mutex );
719             }
720         }
721         
722         if ( cancel ) {
723             qDebug( "%p: Reader::run: detected cancel", this );
724             goto leave;
725         }
726
727         if ( !eof && !error ) {
728             if ( rptr == wptr ) // optimize for larger chunks in case the buffer is empty
729                 rptr = wptr = 0;
730
731             unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer;
732             if ( numBytes > sizeof buffer - wptr )
733                 numBytes = sizeof buffer - wptr;
734
735             qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", this, rptr, wptr, numBytes );
736
737             assert( numBytes > 0 );
738
739             qDebug( "%p: Reader::run: trying to read %d bytes", this, numBytes );
740 #ifdef Q_OS_WIN32
741             isReading = true;
742             mutex.unlock();
743             DWORD numRead;
744             const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 );
745             mutex.lock();
746             isReading = false;
747             if ( ok ) {
748                 if ( numRead == 0 ) {
749                     qDebug( "%p: Reader::run: got eof (numRead==0)", this );
750                     eof = true;
751                 } 
752             } else { // !ok
753                 errorCode = static_cast<int>( GetLastError() );
754                 if ( errorCode == ERROR_BROKEN_PIPE ) {
755                     assert( numRead == 0 );
756                     qDebug( "%p: Reader::run: got eof (broken pipe)", this );
757                     eof = true;
758                 } else {
759                     assert( numRead == 0 );
760                     qDebug( "%p: Reader::run: got error: %s (%d)", this, strerror( errorCode ), errorCode );
761                     error = true;
762                 }
763             }
764 #else
765             qint64 numRead;
766             mutex.unlock();
767             do {
768                 numRead = ::read( fd, buffer + wptr, numBytes );
769             } while ( numRead == -1 && errno == EINTR );
770             mutex.lock();
771
772             if ( numRead < 0 ) {
773                 errorCode = errno;
774                 error = true;
775                 qDebug( "%p: Reader::run: got error: %d", this, errorCode );
776             } else if ( numRead == 0 ) {
777                 qDebug( "%p: Reader::run: eof detected", this );  
778                 eof = true;
779             }
780 #endif
781             qDebug( "%p: Reader::run: read %ld bytes", this, static_cast<long>(numRead) );
782             qDebug( "%p: Reader::run(fd=%d): %s", this, fd, buffer );
783
784             if ( numRead > 0 ) {
785                 qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", this, rptr, wptr );
786                 wptr = ( wptr + numRead ) % sizeof buffer;
787                 qDebug( "%p: Reader::run: buffer after:  rptr=%4d, wptr=%4d", this, rptr, wptr );
788             }
789         }
790     }
791  leave:
792     qDebug( "%p: Reader::run: terminated", this );
793 }
794
795 void Reader::notifyReadyRead()
796 {
797     qDebug( "notifyReadyRead: %d bytes available", bytesInBuffer() );
798     assert( !cancel );
799
800     if ( consumerBlocksOnUs ) {
801         bufferNotEmptyCondition.wakeAll();
802         blockedConsumerIsDoneCondition.wait( &mutex );
803         return;
804     }
805     qDebug( "notifyReadyRead: emit signal" );
806     emit readyRead();
807     readyReadSentCondition.wait( &mutex );
808     qDebug( "notifyReadyRead: returning from waiting, leave" );
809 }
810
811 void Writer::run() {
812
813     LOCKED( this );
814
815     // too bad QThread doesn't have that itself; a signal isn't enough
816     hasStarted.wakeAll();
817
818     qDebug( "%p: Writer::run: started", this );
819
820     while ( true ) {
821
822         while ( !cancel && bufferEmpty() ) {
823             qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
824             bufferEmptyCondition.wakeAll();
825             emit bytesWritten( 0 );
826             qDebug( "%p: Writer::run: buffer is empty, going to sleep", this );
827             bufferNotEmptyCondition.wait( &mutex );
828             qDebug( "%p: Writer::run: woke up", this );
829         }
830
831         if ( cancel ) {
832             qDebug( "%p: Writer::run: detected cancel", this );
833             goto leave;
834         }
835
836         assert( numBytesInBuffer > 0 );
837
838         qDebug( "%p: Writer::run: Trying to write %u bytes", this, numBytesInBuffer );
839         qint64 totalWritten = 0;
840         do {  
841             mutex.unlock();
842 #ifdef Q_OS_WIN32
843             DWORD numWritten;
844             qDebug( "%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%lld): %s:", this, fd, numBytesInBuffer, buffer ); 
845             qDebug( "%p (fd=%d): Writer::run: Going into WriteFile", this, fd );
846             if ( !WriteFile( handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, 0 ) ) {
847                 mutex.lock();
848                 errorCode = static_cast<int>( GetLastError() );
849                 qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
850                 error = true;
851                 goto leave;
852             }
853 #else
854             qint64 numWritten;
855             do {
856                 numWritten = ::write( fd, buffer + totalWritten, numBytesInBuffer - totalWritten );
857             } while ( numWritten == -1 && errno == EINTR );
858
859             if ( numWritten < 0 ) {
860                 mutex.lock();
861                 errorCode = errno;
862                 qDebug( "%p: Writer::run: got error code: %d", this, errorCode );
863                 error = true;
864                 goto leave;
865             }
866 #endif
867             qDebug( "%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", this, fd, numBytesInBuffer, buffer );
868             totalWritten += numWritten;
869             mutex.lock();
870         } while ( totalWritten < numBytesInBuffer );
871
872         qDebug( "%p: Writer::run: wrote %lld bytes", this, totalWritten );
873
874         numBytesInBuffer = 0;
875
876         qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
877         bufferEmptyCondition.wakeAll();
878         emit bytesWritten( totalWritten );
879     }
880  leave:
881     qDebug( "%p: Writer::run: terminating", this );
882     numBytesInBuffer = 0;
883     qDebug( "%p: Writer::run: buffer is empty, wake bufferEmptyCond listeners", this );
884     bufferEmptyCondition.wakeAll();
885     emit bytesWritten( 0 );
886 }
887
888 // static 
889 std::pair<KDPipeIODevice*,KDPipeIODevice*> KDPipeIODevice::makePairOfConnectedPipes() {
890     KDPipeIODevice * read = 0;
891     KDPipeIODevice * write = 0;
892 #ifdef Q_OS_WIN32
893     HANDLE rh;
894     HANDLE wh;
895     SECURITY_ATTRIBUTES sa;
896     memset( &sa, 0, sizeof(sa) );
897     sa.nLength = sizeof(sa);
898     sa.bInheritHandle = TRUE;
899         if ( CreatePipe( &rh, &wh, &sa, BUFFER_SIZE ) ) {
900         read = new KDPipeIODevice;
901         read->open( rh, ReadOnly );
902         write = new KDPipeIODevice;
903         write->open( wh, WriteOnly );
904     }
905 #else
906     int fds[2];
907     if ( pipe( fds ) == 0 ) {
908         read = new KDPipeIODevice;
909         read->open( fds[0], ReadOnly );
910         write = new KDPipeIODevice;
911         write->open( fds[1], WriteOnly );
912     }
913 #endif
914     return std::make_pair( read, write );
915 }
916
917 #ifdef KDAB_DEFINE_CHECKS
918 KDAB_DEFINE_CHECKS( KDPipeIODevice ) {
919     if ( !isOpen() ) {
920         assert( openMode() == NotOpen );
921         assert( !d->reader );
922         assert( !d->writer );
923 #ifdef Q_OS_WIN32
924         assert( !d->handle );
925 #else
926         assert( d->fd < 0 );
927 #endif
928     } else {
929         assert( openMode() != NotOpen );
930         assert( openMode() & ReadWrite );
931         if ( openMode() & ReadOnly ) {
932             assert( d->reader );
933             synchronized( d->reader )
934                 assert( d->reader->eof || d->reader->error || d->reader->isRunning() );
935         }
936         if ( openMode() & WriteOnly ) {
937             assert( d->writer );
938             synchronized( d->writer )
939                 assert( d->writer->error || d->writer->isRunning() );
940         }
941 #ifdef Q_OS_WIN32
942         assert( d->handle );
943 #else
944         assert( d->fd >= 0 );
945 #endif
946     }
947 }
948 #endif // KDAB_DEFINE_CHECKS
949
950 #include "moc_kdpipeiodevice.cpp"
951 #include "kdpipeiodevice.moc"