Tuesday, November 5, 2013

SQL CLR Multi-Threaded File Copy

I created a CLR C# Stored Procedure to Copy, Move and/or Delete files directly from T-SQL. This Procedure lets you pass in large sets of files, Queues them up, and processes them in parallel with an adjustable parallel thread pool size. We use RoboCopy, like many of you, to move files such as backups, data exports, reports, and archive scripts. In most cases RoboCopy has worked great but recently we have been having more problems with performance because the size and quantity of these files has increased. Many of these large backup files are used to create dev, test, or stage work environments or to start Disaster Recovery (DR) copies of production with either log shipping or mirroring which require a full restore of the database to get started. Now we are having problems getting these backups copied and restored within the maintenance windows available to us. We started exploring multi-file backups where a single backup is written to up to 64 files in parallel. Multi-file backups greatly increased the speed for both the backups and the restores but, even with 64 files that were 1/64th the original size, we were still having a copy bottleneck. The RoboCopy process was still only pushing the files one at a time and not even coming close to utilizing the full bandwidth available. This did help alleviate starting over from the beginning if we had a network “Hiccup” during the copy, but t was not speeding up the total time to get the files copied. I did notice that there was a multi-threaded version of RoboCopy but it was only available on specific OS versions. I started looking at other multi-threaded file copy processes such as RichCopy but found them cumbersome to use and deploy to every machine that wanted to use them. At this point we have been using CLR fairly heavily and have a great build and deploy process set up which can push out our CLR code to several hundred servers quickly and reliably so I thought about how much better it would be if file handling was done directly from T-SQL. But in order to do this, I would need to so some things that we had not done before in CLR. Mainly Multi-Threading and Asynchronous processing. Our primary requirement was that the procedure would not be asynchronous but the work it was doing would be. In other words, each file being copied would spin up a new child thread but the procedure would watch each child thread and not complete until all child threads were complete. This way we would be sure the process was complete without having to build in a polling process. In order to make the calls asynchronous, I used the System.Threading.Thread Class. In order to pass enough information to the child thread so that it could run on its own and behave correctly, I built the call to pass a CurrentStatus Class.
public class CurrentStatus
     {
     public string Action;
     public string Name;
     public string Source;
     public string Destination;
     public long Size;
     public DateTime Started;
     public DateTime Finnished;
     public long Coppied;
     public Thread Thrd;
     public Thread Parent;
     public string StatusMSG;
     public int StatusCode;
     }
Another important requirement was that all child threads would be killed if the main procedure was canceled. You would not want zombie threads still copying files if the code had been aborted. This was a little harder to get working as I could not get the child threads to observe the cancel or abort being sent to the parent on their own. I did get the desired result by getting each child thread to look for the parent thread (CurrentStatus.Parent) in between each buffered read/write cycle in the copy. If the child thread looks for the parent and finds it is no longer active, the child thread aborts the copy and drops out. We also needed some sort of status update to be returned so that the query results would show progress. It was also important that this would not overload the output with too much noise. Each time a child thread starts or compleats, a line is sent to the output. The UpdateInterval parameter is used to set how often the parent procedure reports the current status of the running child threads. There is also a Verbose parameter which can compleatly turn off all output if it is too much noise.  The procedure would need a queuing system so that you could limit the number of parallel threads. We found many cases where too many threads would cause memory issues and when copying across the network, too many connections would cause the server to think it was being DOS'd and kill all the connections. The procedure allows you to specify the QueueMax parameter to limit the number of threads that would be allowed to run at the same time. After the number of threads specified are called and running, the parent procedure continues to check the progress of each running thread. As each child thread completes, the parent procedure start a new child thread if there were still entries to be done. Once all threads have completed, then the parent thread will exit. RoboCopy uses the "1/1/1980" date on files being copied to identify files that it was working on but had not yet completed. We found this to be a very usefull method to externaly check in on a copy process to see if it was done yet so I recreated that ability within the procedure. When re-starting a copy process I could also look to see if the destination existed with the "1/1/1980" date and identify files that could restart copying from the point it left off.  I did have a problem when it came to passing in the data to the procedure. I wanted to pass a table parameter with the files to be coppied but quickly found out the CLR Procedures do not allow table parameters. My solution was to use an XML parameter. XML gave me the ability to pass an unlimited list of files and all of the settings as a single parameter in an XML block that looks like the example below.

  
    
    
  
This gives me the ability to have control parameters and the list of files in the same block of data and I designed this XML to be easily generated from a T-SQL Query by using the FOR XML clause. Here is an example of the query used to generate the XML. This uses another CLR Function that returns a directory listing of files and could be replaced with any process that returns a list of file names.
DECLARE   @Data             XML
          ,@Source          VarChar(max)
          ,@Destination     VarChar(max)
          ,@Mask            VarChar(max)

SELECT    @Source           = ‘C:\’
          ,@Destination     = ‘D:\’
          ,@Mask            = ‘dbaadmin*’

;WITH     Settings
          AS
          (
          SELECT    32          AS [QueueMax]            -- Max Number of files coppied at once.
                    ,'false'    AS [ForceOverwrite]      -- true,false
                    ,1          AS [Verbose]             -- -1 = Silent, 0 = Normal, 1 = Percent Updates
                    ,300        AS [UpdateInterval]      -- rate of progress updates in Seconds
          )
          ,CopyFile -- MoveFile, DeleteFile
          AS
          (
          SELECT    FullPathName             AS [Source]
                    ,@Destination + Name     AS [Destination]
          FROM      dbaudf_DirectoryList2(@Source,@Mask,0)
          )
SELECT    @Data = (
                  SELECT          *
                                  ,(SELECT * FROM CopyFile FOR XML RAW ('CopyFile'), TYPE)
                  FROM            Settings
                  FOR XML RAW ('Settings'),TYPE, ROOT('FileProcess')
                  )
Notice the naming of the CTE CopyFile.This can be changed to MoveFile or DeleteFile, along with the call in the final part of the query to change the XML records into Move or Delete records. I don’t recommend using more than one method in a single XML block but it could be possible to mix them if needed. This single query made it possible to pass all the data needed to the CLR code within a single parameter.
Here is my complete C# Code:
using System;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using Microsoft.SqlServer.Server;
using System.Text;
using System.IO;
using System.Diagnostics;
using System.Threading;
using System.Xml;
using System.Collections;
using System.Globalization;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.ComponentModel;
using System.Security.Cryptography;
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;


namespace SQLDBA.Operations
{
    public partial class StoredProcedures
    {
        public class CurrentStatus
        {
            public string Action;
            public string Name;
            public string Source;
            public string Destination;
            public long Size;
            public DateTime Started;
            public DateTime Finnished;
            public long Coppied;
            public Thread Thrd;
            public Thread Parent;
            public string StatusMSG;
            public int StatusCode;
        }

        private const uint FILE_FLAG_NO_BUFFERING = 0x20000000;
        private const uint FILE_FLAG_SEQUENTIAL_SCAN = 0x08000000;
        private const uint FILE_FLAG_OVERLAPPED = 0x40000000;

        [Microsoft.SqlServer.Server.SqlProcedure]
        public static void dbasp_FileHandler(SqlXml data)
        {
            List _Status = new List();

            int TotalFiles = 0;
            long TotalSize = 0;
            int CompletedFiles = 0;
            long CompletedSize = 0;
            int CompletedFilesLast = -1;
            int RunningFilesLast = -1;
            int PendingFilesLast = -1;
            int QueueMax = 64;
            int UpdateInterval = 1;
            Boolean ForceOverwrite = false;
            int Verbose = 0;

            using (SqlConnection sqlCon = new SqlConnection("context connection=true"))
            {
                sqlCon.Open();
                using (XmlReader xmlr = data.CreateReader())
                {
                    DataSet ds = new DataSet();
                    ds.ReadXml(xmlr);
                    foreach (DataTable tbl in ds.Tables)
                    {
                        foreach (DataRow dr in tbl.Rows)
                        {
                            if (tbl.TableName == "Settings")
                            {
                                foreach (DataColumn dc in tbl.Columns)
                                {
                                    //
                                    // SETTING FORCES THE MAXIMUM NUMBER OF PARALLEL THREADS TO BE UNDER THE 64 THREAD DEFAULT
                                    //
                                    if (dc.ColumnName == "QueueMax")
                                    {
                                        if (dr[dc] != null)
                                            QueueMax = Convert.ToInt16(dr[dc].ToString());

                                        if (QueueMax > 64)
                                            QueueMax = 64;

                                        if (QueueMax < 1)
                                            QueueMax = 1;
                                    }

                                    //
                                    // SETTING FORCES ALL COPPIES TO OVERWRITE EVEN IF FILE LOOKS THE SAME
                                    //
                                    if (dc.ColumnName == "ForceOverwrite")
                                        ForceOverwrite = Convert.ToBoolean(dr[dc].ToString());

                                    //
                                    // CHANGES THE OUTPUT SETTINGS 0 = NORMAL, -1 = NONE, 1 = DEBUG
                                    //
                                    if (dc.ColumnName == "Verbose")
                                        Verbose = Convert.ToInt16(dr[dc].ToString());

                                    //
                                    // RATE OF PROGRESS UPDATES IN SECONDS
                                    //
                                    if (dc.ColumnName == "UpdateInterval")
                                        UpdateInterval = Convert.ToInt16(dr[dc].ToString());

                                }
                            }
                            else
                            {
                                CurrentStatus CS = new CurrentStatus();
                                string Source = null;
                                string Destination = null;
                                string FileName = null;
                                long FileSize = 0;

                                FileInfo SFI = null;
                                FileInfo DFI = null;
                                Thread FT = null;
                                BackGroundAction BA = null;

                                // SET ACTION
                                switch (tbl.TableName)
                                {
                                    case "CopyFile":
                                        CS.Action = "Copy";
                                        break;
                                    case "MoveFile":
                                        CS.Action = "Move";
                                        break;
                                    case "DeleteFile":
                                        CS.Action = "Delete";
                                        break;
                                    default:
                                        CS.Action = "Unknown";
                                        break;
                                }

                                if (CS.Action == "Unknown")
                                    SqlContext.Pipe.Send("Unhandled Row Type: " + tbl.TableName);
                                else
                                {
                                    //////////////////////////////
                                    //  START PROCESSING RECORD //
                                    //////////////////////////////
                                    CS.Source = Source = dr.ItemArray.GetValue(0).ToString();
                                    if (System.IO.File.Exists(Source))
                                    {
                                        SFI = new FileInfo(Source);
                                        CS.Name = FileName = SFI.Name;
                                        CS.Size = FileSize = SFI.Length;
                                        CS.Coppied = 0;

                                        TotalFiles = TotalFiles + 1;
                                        TotalSize = TotalSize + FileSize;

                                        if (CS.Action != "Delete")
                                        {
                                            CS.Destination = Destination = dr.ItemArray.GetValue(1).ToString();

                                            //DOES DEST ALREADY EXIST
                                            if (System.IO.File.Exists(Destination))
                                            {
                                                DFI = new FileInfo(Destination);

                                                // WAS THE FILE INTERUPTED DURRING LAST COPY
                                                if (DFI.CreationTime == Convert.ToDateTime("01/01/1980"))
                                                {
                                                    if (DFI.Length > 0 && ForceOverwrite == false)
                                                        CS.StatusMSG = "Exists - Resume";
                                                    else
                                                        CS.StatusMSG = "Exists - Different";
                                                }

                                                // IS CURRENT DEST FILE DIFFERENT
                                                else if (SFI.CreationTimeUtc == DFI.CreationTimeUtc && SFI.LastWriteTimeUtc == DFI.LastWriteTimeUtc && SFI.Length == DFI.Length)
                                                    CS.StatusMSG = "Exists - Same";
                                                else
                                                    CS.StatusMSG = "Exists - Different";
                                            }
                                            else
                                                CS.StatusMSG = "New";
                                        }

                                        if (ForceOverwrite && CS.StatusMSG == "Exists - Same")
                                            CS.StatusMSG = "Exists - Same (OverWrite Forced)";

                                        // ONLY CREATE THREAD IF ACTION IS NEEDED
                                        if (CS.StatusMSG != "Exists - Same")
                                        {
                                            BA = new BackGroundAction(CS);
                                            CS.Thrd = FT = new Thread(new ThreadStart(BA.DoIt));
                                            FT.Name = CS.Action + " " + FileName;
                                            CS.Parent = Thread.CurrentThread;
                                        }

                                        if (CS.Action == "Delete")
                                            SqlContext.Pipe.Send(CS.Action + " " + Source);
                                        else
                                            SqlContext.Pipe.Send(CS.Action + " " + Source + " to " + Destination + "  (" + FileSize.ToString() + ")");
                                    }
                                    else
                                        SqlContext.Pipe.Send("*** File Does Not Exist: " + Source);
                                };

                                _Status.Add(CS);
                            }
                        }
                    }
                }

                // USE RAISERROR TO GET IMIDEIATE SCREEN RESPONSE
                SqlCommand sqlCommand = new SqlCommand(String.Format("raiserror('{0}', -1, -1) with nowait", "Waiting for all threads to complete."), sqlCon);
                SqlContext.Pipe.ExecuteAndSend(sqlCommand);

                Boolean ReportUpdate = true;
                DateTime ReportTime = DateTime.MinValue;

                while (_Status.FindAll(FindDone).Count < _Status.Count)
                {
                    if (DateTime.Now.Subtract(ReportTime).TotalSeconds >= UpdateInterval)
                    {
                        ReportUpdate = true;
                        ReportTime = DateTime.Now;
                    }

                    //
                    //  FILL OR TOP OFF THE QUEUE 
                    //
                    foreach (CurrentStatus CS in _Status.FindAll(FindPending))
                    {
                        if (_Status.FindAll(FindRunning).Count < QueueMax)
                        {
                            CS.Started = DateTime.Now;

                            if (CS.Thrd != null)
                                CS.Thrd.Start();
                        }
                    }

                    //
                    //  CHECK RUNNING THREADS
                    //
                    foreach (CurrentStatus CS in _Status.FindAll(FindRunning))
                    {
                        if (CS.Thrd == null || CS.Thrd.IsAlive == false)
                        {
                            CS.Finnished = DateTime.Now;
                            CS.Coppied = CS.Size;
                            CompletedFiles = CompletedFiles + 1;
                            CompletedSize = CompletedSize + CS.Size;

                            if (CS.StatusMSG == "Exists - Same")
                                SqlContext.Pipe.Send("    Skipped " + CS.Action + " " + CS.Name + "  (SAME FILE)");

                            if (CS.StatusMSG == "Exists - Same (OverWrite Forced)")
                                SqlContext.Pipe.Send("    Replaced " + CS.Action + " " + CS.Name + "  (SAME FILE - Overwrite Forced)");

                            else if (CS.StatusMSG == "Exists - Different")
                                SqlContext.Pipe.Send("    Replaced " + CS.Action + " " + CS.Name + "  (CHANGED FILE)");

                            else if (CS.StatusMSG == "Exists - Resume")
                                SqlContext.Pipe.Send("    Resumed " + CS.Action + " " + CS.Name + "  (PARTIAL FILE)");

                            else
                                SqlContext.Pipe.Send("    Finnished " + CS.Action + " " + CS.Name);
                        }
                        else
                        {
                            if (System.IO.File.Exists(CS.Destination))
                            {
                                CS.Coppied = new FileInfo(CS.Destination).Length;

                                if (ReportUpdate && Verbose == 1)
                                {
                                    double PctDone = ((CS.Coppied * 100.0) / CS.Size) / 100.0;
                                    string Msg = CS.Action + " " + CS.Name;
                                    int gap = 80 - Msg.Length;


                                    var gapString = new StringBuilder(gap);
                                    for (int i = 0; i < gap; i++)
                                    {
                                        gapString.Append(" ");
                                    }


                                    Msg = "raiserror('    " + CS.Action + " " + CS.Name + gapString + "( " + PctDone.ToString("P") + "%)', -1, -1) with nowait";
                                    sqlCommand = new SqlCommand(Msg, sqlCon);
                                    SqlContext.Pipe.ExecuteAndSend(sqlCommand);
                                }
                            }
                        }
                    }

                    if (ReportUpdate && Verbose == 1)
                        SqlContext.Pipe.Send(" ");

                    ReportUpdate = false;

                    // USE RAISERROR TO GET IMIDEIATE SCREEN RESPONSE ONLY IF PROGRESS IS CHANGED
                    if (CompletedFilesLast != CompletedFiles || RunningFilesLast != _Status.FindAll(FindRunning).Count || PendingFilesLast != _Status.FindAll(FindPending).Count )
                    {
                        ReportUpdate = true;
                        sqlCommand = new SqlCommand(String.Format("raiserror('{0}', -1, -1) with nowait", CompletedFiles.ToString() + " of " + TotalFiles.ToString() + " completed. " + _Status.FindAll(FindRunning).Count.ToString() + " Currently Running. " + _Status.FindAll(FindPending).Count.ToString() + " Currently Pending."), sqlCon);
                        SqlContext.Pipe.ExecuteAndSend(sqlCommand);
                        CompletedFilesLast = CompletedFiles;
                        RunningFilesLast = _Status.FindAll(FindRunning).Count;
                        PendingFilesLast = _Status.FindAll(FindPending).Count;
                    }

                    
                    //Thread.Sleep(UpdateInterval*1000); // WAIT X SECONDs
                }

                sqlCon.Close();

            }
            SqlContext.Pipe.Send("Done.");
        }

        private static bool FindPending(CurrentStatus CS)
        {

            if (CS.Started == DateTime.MinValue)
            {
                return true;
            }
            {
                return false;
            }

        }

        private static bool FindRunning(CurrentStatus CS)
        {

            if (CS.Started > DateTime.MinValue && CS.Finnished == DateTime.MinValue)
            {
                return true;
            }
            {
                return false;
            }

        }

        private static bool FindDone(CurrentStatus CS)
        {

            if (CS.Finnished > DateTime.MinValue)
            {
                return true;
            }
            {
                return false;
            }

        }


        private static bool FindMatch(CurrentStatus CS, CurrentStatus MS)
        {
            if (CS == MS)
            {
                return true;
            }
            {
                return false;
            }
        }



        /// 
        /// 
        /// 
        public class BackGroundAction
        {
            // State information used in the task. 
            private CurrentStatus CS;

            /// 
            /// The constructor obtains the state information. 
            /// 
            public BackGroundAction(CurrentStatus CurStat)
            {
                CS = CurStat;
            }

            /// 
            /// The thread procedure performs the task. 
            /// 
            public void DoIt()
            {
                RuntimeHelpers.PrepareConstrainedRegions();
                try { }
                finally
                {
                    if (CS.Action == "Copy")
                    {

                        //System.IO.File.Copy(CS.Source, CS.Destination, true);

                        FileInfo DFI = new FileInfo(CS.Destination);
                        FileInfo SFI = new FileInfo(CS.Source);

                        if (CS.StatusMSG == "Exists - Different")
                            DFI.Delete();

                        if (!DFI.Exists)
                        {
                            using (FileStream DFS = DFI.Create())
                            {
                                DFS.Close();
                            }

                            DFI.CreationTime = Convert.ToDateTime("01/01/1980");
                        }

                        int size = 256 * 1024; //buffer size

                        using (FileStream DF = new FileStream(CS.Destination,FileMode.Append,FileAccess.Write))
                        {
                            using (FileStream SF = SFI.OpenRead())
                            {
                                if (CS.StatusMSG == "Exists - Resume")
                                    SF.Seek(DF.Length, SeekOrigin.Begin);

                                //int size = (SF.CanSeek) ? Math.Min((int)(SF.Length - SF.Position), 0x2000) : 0x2000;

                                byte[][] buffer = new byte[2][];
                                buffer[0] = new byte[size];
                                buffer[1] = new byte[size];
                                var inputBuffer = buffer[0];
                                int bytesRead;
                                IAsyncResult writeResult = null;
                                
                                while ((bytesRead = SF.Read(inputBuffer, 0, size)) != 0)
                                {
                                    if (!CS.Parent.IsAlive)
                                        Thread.CurrentThread.Abort();

                                    // Wait for pending write
                                    if (writeResult != null)
                                    {
                                        writeResult.AsyncWaitHandle.WaitOne();
                                        DF.EndWrite(writeResult);
                                        writeResult = null;
                                    }
                                    // Assign the output buffer
                                    var outputBuffer = inputBuffer;
                                    // and swap input buffers
                                    inputBuffer = (inputBuffer == buffer[0]) ? buffer[1] : buffer[0];
                                    // begin asynchronous write
                                    writeResult = DF.BeginWrite(outputBuffer, 0, bytesRead, null, null);
                                    //Thread.Sleep(100);
                                }
                                if (writeResult != null)
                                {
                                    writeResult.AsyncWaitHandle.WaitOne();
                                    DF.EndWrite(writeResult);
                                }
                            }
                        }

                        DFI.CreationTimeUtc = SFI.CreationTimeUtc;
                        DFI.LastWriteTimeUtc = SFI.LastWriteTimeUtc;
                    }
                    if (CS.Action == "Move")
                    {
                        if (System.IO.File.Exists(CS.Destination))  // IF DESTINATION ALREADY EXISTS
                        {
                            if (System.IO.File.Exists(CS.Destination + ".old")) // IF DESTINATION.OLD ALREADY EXISTS
                                System.IO.File.Delete(CS.Destination + ".old"); // DELETE DESTINATION.OLD

                            System.IO.File.Move(CS.Destination, CS.Destination + ".old"); //RENAME DESTINATION TO DESTINATION.OLD
                        }
                        System.IO.File.Move(CS.Source, CS.Destination); // MOVE FILE
                    }
                    if (CS.Action == "Delete")
                    {
                        System.IO.File.Delete(CS.Source);
                    }
                }
            }
        }
    }
}

No comments:

Post a Comment