Errors in parallel.foreach

Posts   
 
    
dazedorconfused avatar
Posts: 89
Joined: 06-Apr-2006
# Posted on: 28-Feb-2017 20:26:08   

I have an application that has to process a very large file from a legacy system. I have to fetch an entity from the databse (if it exists, create new if not) and then make updates to it. I have to do this for 60k+ records several times a day on a daily basis. My method that does the db interaction accepts a list of POCOs to iterate through and update the database. If I iterate in a for each... loop, it takes entirely too long to process. If I put all db operations in a parallel.foreach like below, I get all sorts of exceptions (see below code snippet for examples).


private void ProcessChildLocations(Dictionary<string, ChildLocation> aChildren)
        {
            Rollins.Core.DAL.DaoClasses.CommonDaoBase.CommandTimeOut = 120;
            var locations = aChildren.Values;
            Parallel.ForEach(locations, (location) =>
            {
                CustomerLocationEntity locEnt = null;
                try
                {
                    locEnt = new CustomerLocationEntity();
                    if (!locEnt.FetchUsingPK(location.LocationID))
                    {
                        locEnt.LocationId = location.LocationID;
                    }
                    populateLocation(ref locEnt, location);
                    locEnt.Save(true);
                    logger.Debug("Processed Location {0} on Thread {1}", location.LocationID, Thread.CurrentThread.ManagedThreadId);
                }
                catch(Exception ex)
                {
                    logger.Error(ex, "The location {0} could not be processed due to an error", location.LocationID);
                    return;
                }
            });
        }

2017-02-28 10:40:55.5767 | Error | Rollins.DataBrokers.Importers.CustomerImporter | Tuesday | February | The location 01009079 could not be processed due to an errorSD.LLBLGen.Pro.ORMSupportClasses.ORMQueryExecutionException: An exception was caught during the execution of an action query: ExecuteNonQuery requires an open and available Connection. The connection's current state is open.. Check InnerException, QueryExecuted and Parameters of this exception to examine the cause of this exception. ---> System.InvalidOperationException: ExecuteNonQuery requires an open and available Connection. The connection's current state is open. at System.Data.SqlClient.SqlConnection.GetOpenConnection(String method) at System.Data.SqlClient.SqlConnection.ValidateConnectionForExecute(String method, SqlCommand command) at System.Data.SqlClient.SqlCommand.ValidateCommand(String method, Boolean async) at System.Data.SqlClient.SqlCommand.InternalExecuteNonQuery(TaskCompletionSource1 completion, String methodName, Boolean sendToPipe, Int32 timeout, Boolean asyncWrite) at System.Data.SqlClient.SqlCommand.ExecuteNonQuery() at SD.LLBLGen.Pro.ORMSupportClasses.ActionQuery.Execute() in c:\Myprojects\VS.NET Projects\LLBLGen Pro v3.5\Frameworks\LLBLGen Pro\RuntimeLibraries\ORMSupportClasses\Query\ActionQuery.cs:line 231 --- End of inner exception stack trace --- at SD.LLBLGen.Pro.ORMSupportClasses.ActionQuery.Execute() in c:\Myprojects\VS.NET Projects\LLBLGen Pro v3.5\Frameworks\LLBLGen Pro\RuntimeLibraries\ORMSupportClasses\Query\ActionQuery.cs:line 261 at SD.LLBLGen.Pro.ORMSupportClasses.BatchActionQuery.Execute() in c:\Myprojects\VS.NET Projects\LLBLGen Pro v3.5\Frameworks\LLBLGen Pro\RuntimeLibraries\ORMSupportClasses\Query\BatchActionQuery.cs:line 112 at SD.LLBLGen.Pro.ORMSupportClasses.DaoBase.ExecuteActionQuery(IActionQuery queryToExecute, ITransaction containingTransaction) in c:\Myprojects\VS.NET Projects\LLBLGen Pro v3.5\Frameworks\LLBLGen Pro\RuntimeLibraries\ORMSupportClasses\SelfServicingSpecific\DaoBase.cs:line 1436 at SD.LLBLGen.Pro.ORMSupportClasses.DaoBase.AddNew(IEntityFields fields, ITransaction containingTransaction) in c:\Myprojects\VS.NET Projects\LLBLGen Pro v3.5\Frameworks\LLBLGen Pro\RuntimeLibraries\ORMSupportClasses\SelfServicingSpecific\DaoBase.cs:line 162 at SD.LLBLGen.Pro.ORMSupportClasses.EntityBase.InsertEntity() in c:\Myprojects\VS.NET Projects\LLBLGen Pro v3.5\Frameworks\LLBLGen Pro\RuntimeLibraries\ORMSupportClasses\SelfServicingSpecific\EntityBase.cs:line 1181 at SD.LLBLGen.Pro.ORMSupportClasses.DaoBase.PersistQueue(List1 queueToPersist, Boolean insertActions, ITransaction transactionToUse, Int32& totalAmountSaved) in c:\Myprojects\VS.NET Projects\LLBLGen Pro v3.5\Frameworks\LLBLGen Pro\RuntimeLibraries\ORMSupportClasses\SelfServicingSpecific\DaoBase.cs:line 2073 at SD.LLBLGen.Pro.ORMSupportClasses.EntityBase.Save(IPredicate updateRestriction, Boolean recurse) in c:\Myprojects\VS.NET Projects\LLBLGen Pro v3.5\Frameworks\LLBLGen Pro\RuntimeLibraries\ORMSupportClasses\SelfServicingSpecific\EntityBase.cs:line 752 at SD.LLBLGen.Pro.ORMSupportClasses.EntityBase.Save(Boolean recurse) in c:\Myprojects\VS.NET Projects\LLBLGen Pro v3.5\Frameworks\LLBLGen Pro\RuntimeLibraries\ORMSupportClasses\SelfServicingSpecific\EntityBase.cs:line 618 at Rollins.DataBrokers.Importers.CustomerImporter.<ProcessChildLocations>b__10_0(ChildLocation location) in C:\Dev\Rollins\source\DataBrokers\Importers\CustomerImporter.cs:line 283

If I move the save calls out of the parallel.foreach, so my method looks like below, it throws an exception saying that the entity is already participating in a transaction.


 private void ProcessChildLocations(Dictionary<string, ChildLocation> aChildren)
        {
            Rollins.Core.DAL.DaoClasses.CommonDaoBase.CommandTimeOut = 120;
            List<CustomerLocationEntity> locsToSave = new List<CustomerLocationEntity>();
            var locations = aChildren.Values;
            Parallel.ForEach(locations, (location) =>
            {
                CustomerLocationEntity locEnt = null;
                try
                {
                    locEnt = new CustomerLocationEntity();
                    if (!locEnt.FetchUsingPK(location.LocationID))
                    {
                        locEnt.LocationId = location.LocationID;
                    }
                    populateLocation(ref locEnt, location);
                    locsToSave.Add(locEnt);
                    logger.Debug("Processed Location {0} on Thread {1}", location.LocationID, Thread.CurrentThread.ManagedThreadId);
                }
                catch(Exception ex)
                {
                    logger.Error(ex, "The location {0} could not be processed due to an error", location.LocationID);
                    return;
                }
            });
            foreach (var loc in locsToSave)
            {
                try
                {
                    loc.Save(true);
                }
                catch (Exception ex)
                {
                    logger.Error(ex);
                }
            }

        }

daelmo avatar
daelmo
Support Team
Posts: 8245
Joined: 28-Nov-2005
# Posted on: 01-Mar-2017 07:03:36   

In this kind of applications I would recommend to use Adapter instead of SelfServicing, because you have way more control on the connection. In this case it would be helpful to:

  • Keep the connection open
  • Reduce the possibilities of lazy loading
  • Avoid to fetch all fields in your validation
  • Save all the entities at once, not one by one. This way you reuse the connection and the transaction.

Is it possible to you to migrate to Adapter template set?

David Elizondo | LLBLGen Support Team
Otis avatar
Otis
LLBLGen Pro Team
Posts: 39614
Joined: 17-Aug-2003
# Posted on: 01-Mar-2017 17:08:04   

I don't think adapter will help here: The ForEach is doing saves on the threadpool, but here persists graphs. I think each graph shares entity instances with other graphs. So when your entity graph method builds an entity graph and re-uses entity instances, they will appear in multiple graphs and saving one of these on thread A and another on thread B will make the graph traversal on thread A wander into the graph of thread B, and vice versa. This thus creates problems as entities are then participating in multiple threads.

Do you cache / share entity instances in multiple graphs built with the populateLocation method? Be sure the graphs returned by that method are unique and don't share an entity instance with another graph.

Frans Bouma | Lead developer LLBLGen Pro
dazedorconfused avatar
Posts: 89
Joined: 06-Apr-2006
# Posted on: 01-Mar-2017 18:32:10   

Thank you both for the insight. All populateLocation is doing is taking the entity by ref and setting it's property values from the POCO.

I restructured my program so that it breaks the List<POCO> into chunck, passes the chunk to the process method which then loads any existing locations into an Entity Collection using a predicate. Then I process the existing entities and add any new ones. Then I use the collection's SaveMulti method to speed up the save.

This has cut processing time from 34 hours to 44 minutes.

Otis avatar
Otis
LLBLGen Pro Team
Posts: 39614
Joined: 17-Aug-2003
# Posted on: 01-Mar-2017 20:31:04   

If you refactor populateLocation to return a new entity, not through 'ref', that might help too, but I have to look up the exact characteristics wrt multi-threading and ref.

That is indeed a better approach, as it uses a transaction per batch and keeps the connection open for a burst save, instead of open/close around every row. Still, a profile run might give more insight in where the time is lost as 34 minutes still sounds like a long time. (so what is waiting for what exactly)...

Frans Bouma | Lead developer LLBLGen Pro