public class CurrentStatus { public string Action; public string Name; public string Source; public string Destination; public long Size; public DateTime Started; public DateTime Finnished; public long Coppied; public Thread Thrd; public Thread Parent; public string StatusMSG; public int StatusCode; }Another important requirement was that all child threads would be killed if the main procedure was canceled. You would not want zombie threads still copying files if the code had been aborted. This was a little harder to get working as I could not get the child threads to observe the cancel or abort being sent to the parent on their own. I did get the desired result by getting each child thread to look for the parent thread (CurrentStatus.Parent) in between each buffered read/write cycle in the copy. If the child thread looks for the parent and finds it is no longer active, the child thread aborts the copy and drops out. We also needed some sort of status update to be returned so that the query results would show progress. It was also important that this would not overload the output with too much noise. Each time a child thread starts or compleats, a line is sent to the output. The UpdateInterval parameter is used to set how often the parent procedure reports the current status of the running child threads. There is also a Verbose parameter which can compleatly turn off all output if it is too much noise. The procedure would need a queuing system so that you could limit the number of parallel threads. We found many cases where too many threads would cause memory issues and when copying across the network, too many connections would cause the server to think it was being DOS'd and kill all the connections. The procedure allows you to specify the QueueMax parameter to limit the number of threads that would be allowed to run at the same time. After the number of threads specified are called and running, the parent procedure continues to check the progress of each running thread. As each child thread completes, the parent procedure start a new child thread if there were still entries to be done. Once all threads have completed, then the parent thread will exit. RoboCopy uses the "1/1/1980" date on files being copied to identify files that it was working on but had not yet completed. We found this to be a very usefull method to externaly check in on a copy process to see if it was done yet so I recreated that ability within the procedure. When re-starting a copy process I could also look to see if the destination existed with the "1/1/1980" date and identify files that could restart copying from the point it left off. I did have a problem when it came to passing in the data to the procedure. I wanted to pass a table parameter with the files to be coppied but quickly found out the CLR Procedures do not allow table parameters. My solution was to use an XML parameter. XML gave me the ability to pass an unlimited list of files and all of the settings as a single parameter in an XML block that looks like the example below.
This gives me the ability to have control parameters and the list of files in the same block of data and I designed this XML to be easily generated from a T-SQL Query by using the FOR XML clause. Here is an example of the query used to generate the XML. This uses another CLR Function that returns a directory listing of files and could be replaced with any process that returns a list of file names.
DECLARE @Data XML ,@Source VarChar(max) ,@Destination VarChar(max) ,@Mask VarChar(max) SELECT @Source = ‘C:\’ ,@Destination = ‘D:\’ ,@Mask = ‘dbaadmin*’ ;WITH Settings AS ( SELECT 32 AS [QueueMax] -- Max Number of files coppied at once. ,'false' AS [ForceOverwrite] -- true,false ,1 AS [Verbose] -- -1 = Silent, 0 = Normal, 1 = Percent Updates ,300 AS [UpdateInterval] -- rate of progress updates in Seconds ) ,CopyFile -- MoveFile, DeleteFile AS ( SELECT FullPathName AS [Source] ,@Destination + Name AS [Destination] FROM dbaudf_DirectoryList2(@Source,@Mask,0) ) SELECT @Data = ( SELECT * ,(SELECT * FROM CopyFile FOR XML RAW ('CopyFile'), TYPE) FROM Settings FOR XML RAW ('Settings'),TYPE, ROOT('FileProcess') )Notice the naming of the CTE CopyFile.This can be changed to MoveFile or DeleteFile, along with the call in the final part of the query to change the XML records into Move or Delete records. I don’t recommend using more than one method in a single XML block but it could be possible to mix them if needed. This single query made it possible to pass all the data needed to the CLR code within a single parameter.
Here is my complete C# Code:
using System; using System.Data; using System.Data.SqlClient; using System.Data.SqlTypes; using Microsoft.SqlServer.Server; using System.Text; using System.IO; using System.Diagnostics; using System.Threading; using System.Xml; using System.Collections; using System.Globalization; using System.Collections.Generic; using System.Runtime.CompilerServices; using System.ComponentModel; using System.Security.Cryptography; using System.Runtime.InteropServices; using System.Runtime.Remoting.Messaging; namespace SQLDBA.Operations { public partial class StoredProcedures { public class CurrentStatus { public string Action; public string Name; public string Source; public string Destination; public long Size; public DateTime Started; public DateTime Finnished; public long Coppied; public Thread Thrd; public Thread Parent; public string StatusMSG; public int StatusCode; } private const uint FILE_FLAG_NO_BUFFERING = 0x20000000; private const uint FILE_FLAG_SEQUENTIAL_SCAN = 0x08000000; private const uint FILE_FLAG_OVERLAPPED = 0x40000000; [Microsoft.SqlServer.Server.SqlProcedure] public static void dbasp_FileHandler(SqlXml data) { List_Status = new List (); int TotalFiles = 0; long TotalSize = 0; int CompletedFiles = 0; long CompletedSize = 0; int CompletedFilesLast = -1; int RunningFilesLast = -1; int PendingFilesLast = -1; int QueueMax = 64; int UpdateInterval = 1; Boolean ForceOverwrite = false; int Verbose = 0; using (SqlConnection sqlCon = new SqlConnection("context connection=true")) { sqlCon.Open(); using (XmlReader xmlr = data.CreateReader()) { DataSet ds = new DataSet(); ds.ReadXml(xmlr); foreach (DataTable tbl in ds.Tables) { foreach (DataRow dr in tbl.Rows) { if (tbl.TableName == "Settings") { foreach (DataColumn dc in tbl.Columns) { // // SETTING FORCES THE MAXIMUM NUMBER OF PARALLEL THREADS TO BE UNDER THE 64 THREAD DEFAULT // if (dc.ColumnName == "QueueMax") { if (dr[dc] != null) QueueMax = Convert.ToInt16(dr[dc].ToString()); if (QueueMax > 64) QueueMax = 64; if (QueueMax < 1) QueueMax = 1; } // // SETTING FORCES ALL COPPIES TO OVERWRITE EVEN IF FILE LOOKS THE SAME // if (dc.ColumnName == "ForceOverwrite") ForceOverwrite = Convert.ToBoolean(dr[dc].ToString()); // // CHANGES THE OUTPUT SETTINGS 0 = NORMAL, -1 = NONE, 1 = DEBUG // if (dc.ColumnName == "Verbose") Verbose = Convert.ToInt16(dr[dc].ToString()); // // RATE OF PROGRESS UPDATES IN SECONDS // if (dc.ColumnName == "UpdateInterval") UpdateInterval = Convert.ToInt16(dr[dc].ToString()); } } else { CurrentStatus CS = new CurrentStatus(); string Source = null; string Destination = null; string FileName = null; long FileSize = 0; FileInfo SFI = null; FileInfo DFI = null; Thread FT = null; BackGroundAction BA = null; // SET ACTION switch (tbl.TableName) { case "CopyFile": CS.Action = "Copy"; break; case "MoveFile": CS.Action = "Move"; break; case "DeleteFile": CS.Action = "Delete"; break; default: CS.Action = "Unknown"; break; } if (CS.Action == "Unknown") SqlContext.Pipe.Send("Unhandled Row Type: " + tbl.TableName); else { ////////////////////////////// // START PROCESSING RECORD // ////////////////////////////// CS.Source = Source = dr.ItemArray.GetValue(0).ToString(); if (System.IO.File.Exists(Source)) { SFI = new FileInfo(Source); CS.Name = FileName = SFI.Name; CS.Size = FileSize = SFI.Length; CS.Coppied = 0; TotalFiles = TotalFiles + 1; TotalSize = TotalSize + FileSize; if (CS.Action != "Delete") { CS.Destination = Destination = dr.ItemArray.GetValue(1).ToString(); //DOES DEST ALREADY EXIST if (System.IO.File.Exists(Destination)) { DFI = new FileInfo(Destination); // WAS THE FILE INTERUPTED DURRING LAST COPY if (DFI.CreationTime == Convert.ToDateTime("01/01/1980")) { if (DFI.Length > 0 && ForceOverwrite == false) CS.StatusMSG = "Exists - Resume"; else CS.StatusMSG = "Exists - Different"; } // IS CURRENT DEST FILE DIFFERENT else if (SFI.CreationTimeUtc == DFI.CreationTimeUtc && SFI.LastWriteTimeUtc == DFI.LastWriteTimeUtc && SFI.Length == DFI.Length) CS.StatusMSG = "Exists - Same"; else CS.StatusMSG = "Exists - Different"; } else CS.StatusMSG = "New"; } if (ForceOverwrite && CS.StatusMSG == "Exists - Same") CS.StatusMSG = "Exists - Same (OverWrite Forced)"; // ONLY CREATE THREAD IF ACTION IS NEEDED if (CS.StatusMSG != "Exists - Same") { BA = new BackGroundAction(CS); CS.Thrd = FT = new Thread(new ThreadStart(BA.DoIt)); FT.Name = CS.Action + " " + FileName; CS.Parent = Thread.CurrentThread; } if (CS.Action == "Delete") SqlContext.Pipe.Send(CS.Action + " " + Source); else SqlContext.Pipe.Send(CS.Action + " " + Source + " to " + Destination + " (" + FileSize.ToString() + ")"); } else SqlContext.Pipe.Send("*** File Does Not Exist: " + Source); }; _Status.Add(CS); } } } } // USE RAISERROR TO GET IMIDEIATE SCREEN RESPONSE SqlCommand sqlCommand = new SqlCommand(String.Format("raiserror('{0}', -1, -1) with nowait", "Waiting for all threads to complete."), sqlCon); SqlContext.Pipe.ExecuteAndSend(sqlCommand); Boolean ReportUpdate = true; DateTime ReportTime = DateTime.MinValue; while (_Status.FindAll(FindDone).Count < _Status.Count) { if (DateTime.Now.Subtract(ReportTime).TotalSeconds >= UpdateInterval) { ReportUpdate = true; ReportTime = DateTime.Now; } // // FILL OR TOP OFF THE QUEUE // foreach (CurrentStatus CS in _Status.FindAll(FindPending)) { if (_Status.FindAll(FindRunning).Count < QueueMax) { CS.Started = DateTime.Now; if (CS.Thrd != null) CS.Thrd.Start(); } } // // CHECK RUNNING THREADS // foreach (CurrentStatus CS in _Status.FindAll(FindRunning)) { if (CS.Thrd == null || CS.Thrd.IsAlive == false) { CS.Finnished = DateTime.Now; CS.Coppied = CS.Size; CompletedFiles = CompletedFiles + 1; CompletedSize = CompletedSize + CS.Size; if (CS.StatusMSG == "Exists - Same") SqlContext.Pipe.Send(" Skipped " + CS.Action + " " + CS.Name + " (SAME FILE)"); if (CS.StatusMSG == "Exists - Same (OverWrite Forced)") SqlContext.Pipe.Send(" Replaced " + CS.Action + " " + CS.Name + " (SAME FILE - Overwrite Forced)"); else if (CS.StatusMSG == "Exists - Different") SqlContext.Pipe.Send(" Replaced " + CS.Action + " " + CS.Name + " (CHANGED FILE)"); else if (CS.StatusMSG == "Exists - Resume") SqlContext.Pipe.Send(" Resumed " + CS.Action + " " + CS.Name + " (PARTIAL FILE)"); else SqlContext.Pipe.Send(" Finnished " + CS.Action + " " + CS.Name); } else { if (System.IO.File.Exists(CS.Destination)) { CS.Coppied = new FileInfo(CS.Destination).Length; if (ReportUpdate && Verbose == 1) { double PctDone = ((CS.Coppied * 100.0) / CS.Size) / 100.0; string Msg = CS.Action + " " + CS.Name; int gap = 80 - Msg.Length; var gapString = new StringBuilder(gap); for (int i = 0; i < gap; i++) { gapString.Append(" "); } Msg = "raiserror(' " + CS.Action + " " + CS.Name + gapString + "( " + PctDone.ToString("P") + "%)', -1, -1) with nowait"; sqlCommand = new SqlCommand(Msg, sqlCon); SqlContext.Pipe.ExecuteAndSend(sqlCommand); } } } } if (ReportUpdate && Verbose == 1) SqlContext.Pipe.Send(" "); ReportUpdate = false; // USE RAISERROR TO GET IMIDEIATE SCREEN RESPONSE ONLY IF PROGRESS IS CHANGED if (CompletedFilesLast != CompletedFiles || RunningFilesLast != _Status.FindAll(FindRunning).Count || PendingFilesLast != _Status.FindAll(FindPending).Count ) { ReportUpdate = true; sqlCommand = new SqlCommand(String.Format("raiserror('{0}', -1, -1) with nowait", CompletedFiles.ToString() + " of " + TotalFiles.ToString() + " completed. " + _Status.FindAll(FindRunning).Count.ToString() + " Currently Running. " + _Status.FindAll(FindPending).Count.ToString() + " Currently Pending."), sqlCon); SqlContext.Pipe.ExecuteAndSend(sqlCommand); CompletedFilesLast = CompletedFiles; RunningFilesLast = _Status.FindAll(FindRunning).Count; PendingFilesLast = _Status.FindAll(FindPending).Count; } //Thread.Sleep(UpdateInterval*1000); // WAIT X SECONDs } sqlCon.Close(); } SqlContext.Pipe.Send("Done."); } private static bool FindPending(CurrentStatus CS) { if (CS.Started == DateTime.MinValue) { return true; } { return false; } } private static bool FindRunning(CurrentStatus CS) { if (CS.Started > DateTime.MinValue && CS.Finnished == DateTime.MinValue) { return true; } { return false; } } private static bool FindDone(CurrentStatus CS) { if (CS.Finnished > DateTime.MinValue) { return true; } { return false; } } private static bool FindMatch(CurrentStatus CS, CurrentStatus MS) { if (CS == MS) { return true; } { return false; } } /// /// /// public class BackGroundAction { // State information used in the task. private CurrentStatus CS; ////// The constructor obtains the state information. /// public BackGroundAction(CurrentStatus CurStat) { CS = CurStat; } ////// The thread procedure performs the task. /// public void DoIt() { RuntimeHelpers.PrepareConstrainedRegions(); try { } finally { if (CS.Action == "Copy") { //System.IO.File.Copy(CS.Source, CS.Destination, true); FileInfo DFI = new FileInfo(CS.Destination); FileInfo SFI = new FileInfo(CS.Source); if (CS.StatusMSG == "Exists - Different") DFI.Delete(); if (!DFI.Exists) { using (FileStream DFS = DFI.Create()) { DFS.Close(); } DFI.CreationTime = Convert.ToDateTime("01/01/1980"); } int size = 256 * 1024; //buffer size using (FileStream DF = new FileStream(CS.Destination,FileMode.Append,FileAccess.Write)) { using (FileStream SF = SFI.OpenRead()) { if (CS.StatusMSG == "Exists - Resume") SF.Seek(DF.Length, SeekOrigin.Begin); //int size = (SF.CanSeek) ? Math.Min((int)(SF.Length - SF.Position), 0x2000) : 0x2000; byte[][] buffer = new byte[2][]; buffer[0] = new byte[size]; buffer[1] = new byte[size]; var inputBuffer = buffer[0]; int bytesRead; IAsyncResult writeResult = null; while ((bytesRead = SF.Read(inputBuffer, 0, size)) != 0) { if (!CS.Parent.IsAlive) Thread.CurrentThread.Abort(); // Wait for pending write if (writeResult != null) { writeResult.AsyncWaitHandle.WaitOne(); DF.EndWrite(writeResult); writeResult = null; } // Assign the output buffer var outputBuffer = inputBuffer; // and swap input buffers inputBuffer = (inputBuffer == buffer[0]) ? buffer[1] : buffer[0]; // begin asynchronous write writeResult = DF.BeginWrite(outputBuffer, 0, bytesRead, null, null); //Thread.Sleep(100); } if (writeResult != null) { writeResult.AsyncWaitHandle.WaitOne(); DF.EndWrite(writeResult); } } } DFI.CreationTimeUtc = SFI.CreationTimeUtc; DFI.LastWriteTimeUtc = SFI.LastWriteTimeUtc; } if (CS.Action == "Move") { if (System.IO.File.Exists(CS.Destination)) // IF DESTINATION ALREADY EXISTS { if (System.IO.File.Exists(CS.Destination + ".old")) // IF DESTINATION.OLD ALREADY EXISTS System.IO.File.Delete(CS.Destination + ".old"); // DELETE DESTINATION.OLD System.IO.File.Move(CS.Destination, CS.Destination + ".old"); //RENAME DESTINATION TO DESTINATION.OLD } System.IO.File.Move(CS.Source, CS.Destination); // MOVE FILE } if (CS.Action == "Delete") { System.IO.File.Delete(CS.Source); } } } } } }
No comments:
Post a Comment