Pages

Wednesday, January 4, 2012

Creating packages in code - Flat File Source to OLE-DB Destination (SQL Server)



programmatically creates a package that imports a text file into SQL Server, with a Flat File Source and the OLE-DB Destination. This shows how you can leverage the SSIS engine to write your own data import tool for example, but beware importing files is not as simple as it may seem.
When you build a similar package in the designer and and select your file, you make some choices about the file format. It seems quite simple, but there is actually some quite complex thinking behind the scenes to parse the file and make suggestions on the file format for columns and their data types. Thus clever logic is actually in the user interface layer and not available to us when building packages in code, so we need to come up with out own method for specifying each and every column.
For this example we infer the file format from the destination table structure, which means means the table must match the CSV exactly, but easily allows us to have accurate data types and also gives us that all import source to destination column mapping.
The finished package just has the one Data Flow Task shown below.
FlatFileToSql Package

The code creates the package, configures the task and components, then saves the package to disk, useful for checking the package and testing, before finally executing. The destination table should be created in advance, although you could easily modify the code to add an Execute SQL Task to create the table as well. The script for the table is at the bottom along with a sample package and a copy of the code file.
namespace Konesans.Dts.Samples
{
    using System;
    using Microsoft.SqlServer.Dts.Runtime;
    using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
    using RuntimeWrapper = Microsoft.SqlServer.Dts.Runtime.Wrapper;
    using System.Data.SqlClient;

    internal class FlatFileToSql
    {
        public void CreatePackage()
        {
            Package package = new Package();
            package.Name = "FlatFileToSql";

            // Define target server and database
            string server = "(local)";
            string database = "master";
            string targetTable = "OLEDBDestinationTable";

            // Add the Flat File connection
            ConnectionManager connectionManagerFlatFile = package.Connections.Add("FLATFILE");
            connectionManagerFlatFile.ConnectionString = @"C:\Temp\FlatFile.txt";
            connectionManagerFlatFile.Name = "FlatFile";
            connectionManagerFlatFile.Properties["Format"].SetValue(connectionManagerFlatFile, "Delimited");
            connectionManagerFlatFile.Properties["ColumnNamesInFirstDataRow"].SetValue(connectionManagerFlatFile, true);

            // Get native flat file connection 
            RuntimeWrapper.IDTSConnectionManagerFlatFile90 connectionFlatFile = 
                connectionManagerFlatFile.InnerObject as RuntimeWrapper.IDTSConnectionManagerFlatFile90;

            // Connect to SQL server and examine metadata of target table, but must exclude 
            // extra Flat File FileNameColumnName (FileName) column as that is added by source
            SqlConnection connection = new SqlConnection(
                string.Format("Data Source={0};Initial Catalog={1};Integrated Security=SSPI;", server, database));
            SqlCommand command = new SqlCommand(
                "SELECT name, xtype, length, scale, prec FROM sys.syscolumns " + 
                "WHERE id = OBJECT_ID(@OBJECT_NAME) AND name <> 'FileName'", connection);
            command.Parameters.Add(new SqlParameter("@OBJECT_NAME", targetTable));
            connection.Open();
            
            using (SqlDataReader reader = command.ExecuteReader())
            {
                // Create Flat File columns based on SQL columns
                while (reader.Read())
                {
                    // Create Flat File column to match SQL target column
                    RuntimeWrapper.IDTSConnectionManagerFlatFileColumn90 flatFileColumn = 
                        connectionFlatFile.Columns.Add() as RuntimeWrapper.IDTSConnectionManagerFlatFileColumn90;
                    SetDtsColumnProperties(flatFileColumn, reader);
                }
            }

            // Check we have columns
            if (connectionFlatFile.Columns.Count == 0)
            {
                throw new ArgumentException(string.Format("No flat file columns have been created, " +
                    "check that the destination table '{0}' exists.", targetTable));
            }

            // Correct the last Flat File column delimiter, needs to be NewLine not Comma
            connectionFlatFile.Columns[connectionFlatFile.Columns.Count -1].ColumnDelimiter = Environment.NewLine;
            
            // Add the SQL OLE-DB connection
            ConnectionManager connectionManagerOleDb = package.Connections.Add("OLEDB");
            connectionManagerOleDb.ConnectionString = string.Format(
                "Provider=SQLOLEDB.1;Data Source={0};Initial Catalog={1};Integrated Security=SSPI;", server, database);
            connectionManagerOleDb.Name = "OLEDB";

            // Add the Data Flow Task 
            package.Executables.Add("STOCK:PipelineTask");

            // Get the task host wrapper, and the Data Flow task
            TaskHost taskHost = package.Executables[0] as TaskHost;
            MainPipe dataFlowTask = (MainPipe)taskHost.InnerObject;

            // Add Flat File source component
            IDTSComponentMetaData90 componentSource = dataFlowTask.ComponentMetaDataCollection.New();
            componentSource.Name = "FlatFileSource";
            componentSource.ComponentClassID = "DTSAdapter.FlatFileSource.1";

            // Get source design-time instance, and initialise component
            CManagedComponentWrapper instanceSource = componentSource.Instantiate();
            instanceSource.ProvideComponentProperties();

            // Set source connection
            componentSource.RuntimeConnectionCollection[0].ConnectionManagerID = connectionManagerFlatFile.ID;
            componentSource.RuntimeConnectionCollection[0].ConnectionManager = 
                DtsConvert.ToConnectionManager90(connectionManagerFlatFile);

            // Set the source properties, optional, we use the extra FileNameColumnName column property
            instanceSource.SetComponentProperty("FileNameColumnName", "FileName");

            // Reinitialize the metadata, 
            instanceSource.AcquireConnections(null);
            instanceSource.ReinitializeMetaData();
            instanceSource.ReleaseConnections();

            // Add OLE-DB destination
            IDTSComponentMetaData90 componentDestination = dataFlowTask.ComponentMetaDataCollection.New();
            componentDestination.Name = "OLEDBDestination";
            componentDestination.ComponentClassID = "DTSAdapter.OLEDBDestination.1";

            // Get destination design-time instance, and initialise component
            CManagedComponentWrapper instanceDestination = componentDestination.Instantiate();
            instanceDestination.ProvideComponentProperties();

            // Set destination connection
            componentDestination.RuntimeConnectionCollection[0].ConnectionManagerID = connectionManagerOleDb.ID;
            componentDestination.RuntimeConnectionCollection[0].ConnectionManager = 
                DtsConvert.ToConnectionManager90(connectionManagerOleDb);

            // Set destination table name
            instanceDestination.SetComponentProperty("OpenRowset", targetTable);

            IDTSPath90 path = dataFlowTask.PathCollection.New();
            path.AttachPathAndPropagateNotifications(componentSource.OutputCollection[0], 
                componentDestination.InputCollection[0]);
            
            // Get input and virtual input for destination to select and map columns
            IDTSInput90 destinationInput = componentDestination.InputCollection[0];
            IDTSVirtualInput90 destinationVirtualInput = destinationInput.GetVirtualInput();
            IDTSVirtualInputColumnCollection90 destinationVirtualInputColumns = 
                destinationVirtualInput.VirtualInputColumnCollection;

            // Reinitialize the metadata, generating exernal columns from flat file columns
            // If errors are raised here, it is most likely because the flat file connection columns 
            // are wrong, which itself is probably because the template table does not match the file.
            instanceDestination.AcquireConnections(null);
            instanceDestination.ReinitializeMetaData();
            instanceDestination.ReleaseConnections();

            // Select and map destination columns
            foreach (IDTSVirtualInputColumn90 virtualInputColumn in destinationVirtualInputColumns)
            {
                // Select column, and retain new input column
                IDTSInputColumn90 inputColumn = instanceDestination.SetUsageType(destinationInput.ID, 
                    destinationVirtualInput, virtualInputColumn.LineageID, DTSUsageType.UT_READONLY);
                // Find external column by name
                IDTSExternalMetadataColumn90 externalColumn = 
                    destinationInput.ExternalMetadataColumnCollection[inputColumn.Name];
                // Map input column to external column
                instanceDestination.MapInputColumn(destinationInput.ID, inputColumn.ID, externalColumn.ID);
            }


            #if DEBUG
            // Save package to disk, DEBUG only
            new Application().SaveToXml(String.Format(@"C:\Temp\{0}.dtsx", package.Name), package, null);
            Console.WriteLine(@"C:\Temp\{0}.dtsx", package.Name);
            #endif

            package.Execute();

            foreach (DtsError error in package.Errors)
            {
                Console.WriteLine("ErrorCode       : {0}", error.ErrorCode);
                Console.WriteLine("  SubComponent  : {0}", error.SubComponent);
                Console.WriteLine("  Description   : {0}", error.Description);
            }

            package.Dispose();
        }

        private void SetDtsColumnProperties(RuntimeWrapper.IDTSConnectionManagerFlatFileColumn90 flatFileColumn, 
            SqlDataReader reader)
        {
            flatFileColumn.ColumnType = "Delimited";
            flatFileColumn.ColumnDelimiter = ",";

            switch (Convert.ToInt16(reader["xtype"]))
            {
                case 104 :  // DT_BOOL  bit
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_BOOL;                    
                    break;  

                case 173 :   // DT_BYTES binary, varbinary, timestamp
                case 165 :
                case 189 :
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_BYTES;
                    flatFileColumn.ColumnWidth = Convert.ToInt32(reader["length"]);
                    break;

                case 60 :   // DT_CY smallmoney, money
                case 122:
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_CY;
                    flatFileColumn.DataPrecision = Convert.ToInt32(reader["prec"]);
                    flatFileColumn.DataScale = (int)reader["scale"];
                    break;

                case 61 :   // DT_DBTIMESTAMP datetime, smalldatetime
                case 58 :
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_DBTIMESTAMP;
                    break;

                case 36 :   // DT_GUID uniqueidentifier
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_GUID;
                    break;

                case 52 :    // DT_I2 smallint
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_I2;
                    break;

                case 56 :    // DT_I4 int
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_I4;
                    break;

                case 127 :    // DT_I8 bigint
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_I8;
                    break;

                case 106 :  // DT_NUMERIC decimal, numeric
                case 108 :
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_NUMERIC;
                    flatFileColumn.DataPrecision = Convert.ToInt32(reader["prec"]);
                    flatFileColumn.DataScale = (int)reader["scale"];                
                    break;

                case 59 :    // DT_R4 real
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_R4;
                    break;

                case 62 :    // DT_R8 float
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_R8;
                    break;

                case 175 :    // DT_STR char, varchar
                case 167 : 
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_STR ;
                    flatFileColumn.ColumnWidth = Convert.ToInt32(reader["length"]);     
                    break;

                case 48 :    // DT_UI1 tinyint
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_UI1;
                    break;

                case 239 :    // DT_WSTR nchar, nvarchar, sql_variant, xml
                case 231 :
                case 98 :
                case 241 :
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_WSTR;
                    flatFileColumn.ColumnWidth = Convert.ToInt32(reader["length"]) / 2;
                    break;

                case 34 :    // DT_IMAGE image
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_IMAGE;
                    break;

                case 99 :    // DT_NTEXT ntext
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_NTEXT;
                    break;

                case 35 :    // DT_TEXT text
                    flatFileColumn.DataType = RuntimeWrapper.DataType.DT_TEXT;
                    break;

            }                

            RuntimeWrapper.IDTSName90 columnName = flatFileColumn as RuntimeWrapper.IDTSName90;
            columnName.Name = reader["name"].ToString();
        }
    }
}
Sample package, code file, and destination table script.

No comments:

Post a Comment