programmatically creates a package that imports a text file into SQL Server, with a Flat File Source and the OLE-DB Destination. This shows how you can leverage the SSIS engine to write your own data import tool for example, but beware importing files is not as simple as it may seem.
When you build a similar package in the designer and and select your file, you make some choices about the file format. It seems quite simple, but there is actually some quite complex thinking behind the scenes to parse the file and make suggestions on the file format for columns and their data types. Thus clever logic is actually in the user interface layer and not available to us when building packages in code, so we need to come up with out own method for specifying each and every column.
For this example we infer the file format from the destination table structure, which means means the table must match the CSV exactly, but easily allows us to have accurate data types and also gives us that all import source to destination column mapping.
The finished package just has the one Data Flow Task shown below.
The code creates the package, configures the task and components, then saves the package to disk, useful for checking the package and testing, before finally executing. The destination table should be created in advance, although you could easily modify the code to add an Execute SQL Task to create the table as well. The script for the table is at the bottom along with a sample package and a copy of the code file.
When you build a similar package in the designer and and select your file, you make some choices about the file format. It seems quite simple, but there is actually some quite complex thinking behind the scenes to parse the file and make suggestions on the file format for columns and their data types. Thus clever logic is actually in the user interface layer and not available to us when building packages in code, so we need to come up with out own method for specifying each and every column.
For this example we infer the file format from the destination table structure, which means means the table must match the CSV exactly, but easily allows us to have accurate data types and also gives us that all import source to destination column mapping.
The finished package just has the one Data Flow Task shown below.
The code creates the package, configures the task and components, then saves the package to disk, useful for checking the package and testing, before finally executing. The destination table should be created in advance, although you could easily modify the code to add an Execute SQL Task to create the table as well. The script for the table is at the bottom along with a sample package and a copy of the code file.
namespace Konesans.Dts.Samples { using System; using Microsoft.SqlServer.Dts.Runtime; using Microsoft.SqlServer.Dts.Pipeline.Wrapper; using RuntimeWrapper = Microsoft.SqlServer.Dts.Runtime.Wrapper; using System.Data.SqlClient; internal class FlatFileToSql { public void CreatePackage() { Package package = new Package(); package.Name = "FlatFileToSql"; // Define target server and database string server = "(local)"; string database = "master"; string targetTable = "OLEDBDestinationTable"; // Add the Flat File connection ConnectionManager connectionManagerFlatFile = package.Connections.Add("FLATFILE"); connectionManagerFlatFile.ConnectionString = @"C:\Temp\FlatFile.txt"; connectionManagerFlatFile.Name = "FlatFile"; connectionManagerFlatFile.Properties["Format"].SetValue(connectionManagerFlatFile, "Delimited"); connectionManagerFlatFile.Properties["ColumnNamesInFirstDataRow"].SetValue(connectionManagerFlatFile, true); // Get native flat file connection RuntimeWrapper.IDTSConnectionManagerFlatFile90 connectionFlatFile = connectionManagerFlatFile.InnerObject as RuntimeWrapper.IDTSConnectionManagerFlatFile90; // Connect to SQL server and examine metadata of target table, but must exclude // extra Flat File FileNameColumnName (FileName) column as that is added by source SqlConnection connection = new SqlConnection( string.Format("Data Source={0};Initial Catalog={1};Integrated Security=SSPI;", server, database)); SqlCommand command = new SqlCommand( "SELECT name, xtype, length, scale, prec FROM sys.syscolumns " + "WHERE id = OBJECT_ID(@OBJECT_NAME) AND name <> 'FileName'", connection); command.Parameters.Add(new SqlParameter("@OBJECT_NAME", targetTable)); connection.Open(); using (SqlDataReader reader = command.ExecuteReader()) { // Create Flat File columns based on SQL columns while (reader.Read()) { // Create Flat File column to match SQL target column RuntimeWrapper.IDTSConnectionManagerFlatFileColumn90 flatFileColumn = connectionFlatFile.Columns.Add() as RuntimeWrapper.IDTSConnectionManagerFlatFileColumn90; SetDtsColumnProperties(flatFileColumn, reader); } } // Check we have columns if (connectionFlatFile.Columns.Count == 0) { throw new ArgumentException(string.Format("No flat file columns have been created, " + "check that the destination table '{0}' exists.", targetTable)); } // Correct the last Flat File column delimiter, needs to be NewLine not Comma connectionFlatFile.Columns[connectionFlatFile.Columns.Count -1].ColumnDelimiter = Environment.NewLine; // Add the SQL OLE-DB connection ConnectionManager connectionManagerOleDb = package.Connections.Add("OLEDB"); connectionManagerOleDb.ConnectionString = string.Format( "Provider=SQLOLEDB.1;Data Source={0};Initial Catalog={1};Integrated Security=SSPI;", server, database); connectionManagerOleDb.Name = "OLEDB"; // Add the Data Flow Task package.Executables.Add("STOCK:PipelineTask"); // Get the task host wrapper, and the Data Flow task TaskHost taskHost = package.Executables[0] as TaskHost; MainPipe dataFlowTask = (MainPipe)taskHost.InnerObject; // Add Flat File source component IDTSComponentMetaData90 componentSource = dataFlowTask.ComponentMetaDataCollection.New(); componentSource.Name = "FlatFileSource"; componentSource.ComponentClassID = "DTSAdapter.FlatFileSource.1"; // Get source design-time instance, and initialise component CManagedComponentWrapper instanceSource = componentSource.Instantiate(); instanceSource.ProvideComponentProperties(); // Set source connection componentSource.RuntimeConnectionCollection[0].ConnectionManagerID = connectionManagerFlatFile.ID; componentSource.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.ToConnectionManager90(connectionManagerFlatFile); // Set the source properties, optional, we use the extra FileNameColumnName column property instanceSource.SetComponentProperty("FileNameColumnName", "FileName"); // Reinitialize the metadata, instanceSource.AcquireConnections(null); instanceSource.ReinitializeMetaData(); instanceSource.ReleaseConnections(); // Add OLE-DB destination IDTSComponentMetaData90 componentDestination = dataFlowTask.ComponentMetaDataCollection.New(); componentDestination.Name = "OLEDBDestination"; componentDestination.ComponentClassID = "DTSAdapter.OLEDBDestination.1"; // Get destination design-time instance, and initialise component CManagedComponentWrapper instanceDestination = componentDestination.Instantiate(); instanceDestination.ProvideComponentProperties(); // Set destination connection componentDestination.RuntimeConnectionCollection[0].ConnectionManagerID = connectionManagerOleDb.ID; componentDestination.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.ToConnectionManager90(connectionManagerOleDb); // Set destination table name instanceDestination.SetComponentProperty("OpenRowset", targetTable); IDTSPath90 path = dataFlowTask.PathCollection.New(); path.AttachPathAndPropagateNotifications(componentSource.OutputCollection[0], componentDestination.InputCollection[0]); // Get input and virtual input for destination to select and map columns IDTSInput90 destinationInput = componentDestination.InputCollection[0]; IDTSVirtualInput90 destinationVirtualInput = destinationInput.GetVirtualInput(); IDTSVirtualInputColumnCollection90 destinationVirtualInputColumns = destinationVirtualInput.VirtualInputColumnCollection; // Reinitialize the metadata, generating exernal columns from flat file columns // If errors are raised here, it is most likely because the flat file connection columns // are wrong, which itself is probably because the template table does not match the file. instanceDestination.AcquireConnections(null); instanceDestination.ReinitializeMetaData(); instanceDestination.ReleaseConnections(); // Select and map destination columns foreach (IDTSVirtualInputColumn90 virtualInputColumn in destinationVirtualInputColumns) { // Select column, and retain new input column IDTSInputColumn90 inputColumn = instanceDestination.SetUsageType(destinationInput.ID, destinationVirtualInput, virtualInputColumn.LineageID, DTSUsageType.UT_READONLY); // Find external column by name IDTSExternalMetadataColumn90 externalColumn = destinationInput.ExternalMetadataColumnCollection[inputColumn.Name]; // Map input column to external column instanceDestination.MapInputColumn(destinationInput.ID, inputColumn.ID, externalColumn.ID); } #if DEBUG // Save package to disk, DEBUG only new Application().SaveToXml(String.Format(@"C:\Temp\{0}.dtsx", package.Name), package, null); Console.WriteLine(@"C:\Temp\{0}.dtsx", package.Name); #endif package.Execute(); foreach (DtsError error in package.Errors) { Console.WriteLine("ErrorCode : {0}", error.ErrorCode); Console.WriteLine(" SubComponent : {0}", error.SubComponent); Console.WriteLine(" Description : {0}", error.Description); } package.Dispose(); } private void SetDtsColumnProperties(RuntimeWrapper.IDTSConnectionManagerFlatFileColumn90 flatFileColumn, SqlDataReader reader) { flatFileColumn.ColumnType = "Delimited"; flatFileColumn.ColumnDelimiter = ","; switch (Convert.ToInt16(reader["xtype"])) { case 104 : // DT_BOOL bit flatFileColumn.DataType = RuntimeWrapper.DataType.DT_BOOL; break; case 173 : // DT_BYTES binary, varbinary, timestamp case 165 : case 189 : flatFileColumn.DataType = RuntimeWrapper.DataType.DT_BYTES; flatFileColumn.ColumnWidth = Convert.ToInt32(reader["length"]); break; case 60 : // DT_CY smallmoney, money case 122: flatFileColumn.DataType = RuntimeWrapper.DataType.DT_CY; flatFileColumn.DataPrecision = Convert.ToInt32(reader["prec"]); flatFileColumn.DataScale = (int)reader["scale"]; break; case 61 : // DT_DBTIMESTAMP datetime, smalldatetime case 58 : flatFileColumn.DataType = RuntimeWrapper.DataType.DT_DBTIMESTAMP; break; case 36 : // DT_GUID uniqueidentifier flatFileColumn.DataType = RuntimeWrapper.DataType.DT_GUID; break; case 52 : // DT_I2 smallint flatFileColumn.DataType = RuntimeWrapper.DataType.DT_I2; break; case 56 : // DT_I4 int flatFileColumn.DataType = RuntimeWrapper.DataType.DT_I4; break; case 127 : // DT_I8 bigint flatFileColumn.DataType = RuntimeWrapper.DataType.DT_I8; break; case 106 : // DT_NUMERIC decimal, numeric case 108 : flatFileColumn.DataType = RuntimeWrapper.DataType.DT_NUMERIC; flatFileColumn.DataPrecision = Convert.ToInt32(reader["prec"]); flatFileColumn.DataScale = (int)reader["scale"]; break; case 59 : // DT_R4 real flatFileColumn.DataType = RuntimeWrapper.DataType.DT_R4; break; case 62 : // DT_R8 float flatFileColumn.DataType = RuntimeWrapper.DataType.DT_R8; break; case 175 : // DT_STR char, varchar case 167 : flatFileColumn.DataType = RuntimeWrapper.DataType.DT_STR ; flatFileColumn.ColumnWidth = Convert.ToInt32(reader["length"]); break; case 48 : // DT_UI1 tinyint flatFileColumn.DataType = RuntimeWrapper.DataType.DT_UI1; break; case 239 : // DT_WSTR nchar, nvarchar, sql_variant, xml case 231 : case 98 : case 241 : flatFileColumn.DataType = RuntimeWrapper.DataType.DT_WSTR; flatFileColumn.ColumnWidth = Convert.ToInt32(reader["length"]) / 2; break; case 34 : // DT_IMAGE image flatFileColumn.DataType = RuntimeWrapper.DataType.DT_IMAGE; break; case 99 : // DT_NTEXT ntext flatFileColumn.DataType = RuntimeWrapper.DataType.DT_NTEXT; break; case 35 : // DT_TEXT text flatFileColumn.DataType = RuntimeWrapper.DataType.DT_TEXT; break; } RuntimeWrapper.IDTSName90 columnName = flatFileColumn as RuntimeWrapper.IDTSName90; columnName.Name = reader["name"].ToString(); } } }Sample package, code file, and destination table script.
No comments:
Post a Comment