R: R: [Developers] Parallelization of computations

Kristofer Tingdahl kristofer.tingdahl at dgb-group.com
Fri Mar 7 19:58:51 CET 2008


Dear Antonio,
> Graphically:
> output task 1: _******__________ +
> output task 2: _____******______ +
> output task 3: _________******__ +
>
> output attrib: __*************__ =
>
> If I do allow each thread to directly update the output vector, I suspect thread safety is compromised when two threads are changing the same output[k] value.
Assuming that the access to the output is very limited in time, you can
always have a mutex that protects the output, something like this:

const float result = myComputation();
mutex_.lock();
output[k] = result;
mutex_.unLock();

The mutex_ (Threads::Mutex) is stored on your class.

> I guess I have to add a method to the AttribProvider class that is called after all threads have completed their work, to finalize the attrib output.
>
> Hope I manage to do it just deriving a class from AttributeProvider and adding some kind of afterProcessingDone() method, but I haven't examined the code in detail.
This is a very good point, and I have therefor added a
finalizeCalculation on the attribute provider that will be called from
the main thread once all calculations are done. Now, you will only get
that function at the next release. In the meanwhile, you can do the
following tricks:

Thread 0 will wait for all other threads to finish, and then merge it.

class MyAttrib : public AttribProvider
{
   MyAttrib()
   { threadoutputs_.allowNull(); }
   ~MyAttrib() { deepEraseArr( threadoutputs_ ); }

bool computeData( const DataHolder& output,
                  const BinID& relpos,int t0,
                  int nrsamples,int threadidx )
{
    lock_.lock();
    if ( mIsUdf(calcrg_.start) )
    {
        calcrg_.start = t0;
        calcrg_.stop = t0+nrsamples-1;
    }
    else
    {
       calcrg_.include( t0 );
       calcrg_.include( t0+nrsamples-1 );
    }
    lock_.unLock();

    if ( !threadoutputs_[threadidx] )
        threadoutputs_.replace( threadidx, new float[nrsamples] );

    calcstarts_[threadidx] = t0;

    const bool success = yourComputation();
    //Do your calculation here, store the values in
threadoutputs_[threadidx] array

    lock_.lock();
    nrunfinishedthreads_ --;
    if ( !success )
        success_ = false;

    if ( !threadidx )
    {
        while ( success_ && nrunfinishedthreads_ )
          lock_.wait();

        if ( success_ )
        {
          //All other threads are finished
          //Take result from threadoutputs_ and merge them into output
        }

    }
    else
       lock_.signal( false ); //Make waiting thread wake up

    lock_.unLock();

    return success;
}


bool setNrThreads( int nrthreads )
{
    deepEraseArr( threadoutputs_ );
    for ( int idx=0; idx<nrthreads; idx++ )
    { threadoutputs_ += 0; }

    calcstarts_.setSize( nrthreads, mUdf(int) );

    nrunfinishedthreads_ = nrthreads;

    return true;
}


protected:
    Interval<int>      calcrg_;      //needed to merge data
    TypeSet<int>       calcstarts_;   //needed to merge data
    ObjectSet<float>   threadoutputs_;   //the output from each thread
    int                nrunfinishedthreads_; //Is counted down from
nrthreads to 0
    bool               success_;
    Threads::ConditionVar lock_;
};



If you have any further questions, don't hesitate to come back to us on
the developers list.


Best regards,


Kristofer

-- 
Dr. Kristofer Tingdahl
dGB Earth Sciences
1 Sugar Creek Center BLVD #935; Sugar Land, TX 77478; USA
+1 281 240 3939






More information about the Developers mailing list