Monday, August 29, 2011

Streaming SQL Server VARBINARY(MAX) In Chunks With ADO.NET

A few years ago it was unthinkable to store BLOB data like large documents directly inside of a database. The most common way to store those large objects in combination with a database was to save the data to the file system and just store the UNC path in our database. Today storing BLOB data directly inside of a database became a common requirement in many projects. This new way to save binary data brings up a some advantages and drawbacks to think about.

The probably most important advantage is security. If saving BLOB data in file system we not only have to manage security on database level but also restrict access to the file system, to avoid unauthorized access. Another advantage is integrity. A UNC file path, stored in a database becomes easily invalid if someone delete the file. This can happen by maleware attacks but also by bugs in a software that changes one side but not the other one. A file name doesn't provide any way to enforce referential integrity. Sometimes it is tricky to keep things synchronized, especially when working with transactions.

On the other hand there is especially one drawback, when storing BLOBs inside of the database. If data are really large, say larger than 10MB or 100MB, it becomes tricky to save and read them without allocating huge amounts of data on server and client side. Where smaller data can easily be pushed or pulled in one chunk we don't want the client or server to handle 100MB as one big bunch of memory.

Missing Class in ADO.NET Framework?

On SQL Server 2008 when working with VARBINARY(MAX) based on FILESTREAM we can use the ADO.NET SqlFileStream class to stream data from and to SQL Server. Unfortunately this class does not work if data are stored inside of a VARBINARY(MAX) column and not everyone wants to activate FILESTREAM - out of other reasons because of the integrity problem.

SqlBinaryData Class

To become able to stream data stored in a VARBINARY(MAX) column, I wrote a helper class called SqlBinaryData that provides a simple but powerful interface.
public class SqlBinaryData : IBinaryData {
   //...
   
   /// <summary>
   /// Creates a new readable instance of a <see cref="Stream"/> for the BLOB column
   /// </summary>
   /// <returns>A readable stream.</returns>
   public Stream OpenRead() {
      return new SqlBinaryReader( // ...
   }

   /// <summary>
   /// Creates a new writable instance of a <see cref="Stream"/> for the BLOB column
   /// </summary>
   /// <returns>A writable stream.</returns>
   public Stream OpenWrite(bool append) {
      return new SqlBinaryWriter( // ...
   }

   // ...
}

Since the class works with four additional internal classes (about 700 lines of code) I will only focus on the most important parts here. You can find a ZIP archive containing the whole source code and my unit tests downloadable at the end of this post.

The OpenRead method creates an instance of an internal SqlBinaryReader class that implements a readable version of a System.IO.Stream and returns it to the consumer. The OpenWrite method creates an instance of an internal SqlBinaryWriter class that implements a writable version of System.IO.Stream and returns it. The other two classes, SqlBinaryInfo and SqlBinaryMetaData are primary used to provide database connections, handle possible SQL injections and provide metadata.

Metadata Evaluation And Caching

Metadata, like primary key column and table names are established from SQL Server when first needed. The first time when this data is needed is when a stream becomes read or written. Until this there will be no database connection opened to avoid unnecessary resource allocation and database utilization.

All metadata are cached for the current application domain. If an instance of a SqlBinaryData needs database metadata that have already been established for the same table, binary column and connection string this information will be reused from an internal cache. If metadata for an unknown binary column is requested from two threads it is possible that one of the threads becomes suspended until metadata are allocated by the second thread. This will likely not cause any problems but should be known.

If a database schema might change over the life time of a process and the primary key column becomes changed by name or type it is possible to clear the internal cache by calling the static method SqlBinaryData.ClearMetaDataCache(). A reading or writing stream that is already active while the cache becomes cleared will keep working with its previously requested metadata what might cause an error, any new creation of a stream, even from an already existing SqlBinaryData instance causes a reload of metadata from database.

Metadata are allocated by utilizing SQL Server INFORMATION_SCHEMA views, so should be accessible with most common database user rights.

Table and schema name are taken, and so validated, from INFORMATION_SCHEMA.TABLES view.
-- If table schema is specified
SELECT QUOTENAME(TABLE_NAME), QUOTENAME(TABLE_SCHEMA), TABLE_SCHEMA
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = @tableName
   AND TABLE_SCHEMA = @schemaName

-- If table schema is not specified
SELECT QUOTENAME(TABLE_NAME), QUOTENAME(TABLE_SCHEMA), TABLE_SCHEMA
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = @tableName
   AND TABLE_SCHEMA = OBJECT_SCHEMA_NAME(OBJECT_ID(@tableName))
Primary key column information is taken from INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE view.
SELECT QUOTENAME(COLUMN_NAME)
FROM INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE
WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName
AND OBJECTPROPERTY(OBJECT_ID(QUOTENAME(CONSTRAINT_SCHEMA)
         + '.' + QUOTENAME(CONSTRAINT_NAME))
      ,'IsPrimaryKey') = 1
If no primary key was found an exception will be thrown. If the primary key consists of more than one column an exception will be thrown since this version of SqlBinaryData class does not support composite primary keys.

Binary column information is taken from INFORMATION_SCHEMA.COLUMNS view.
SELECT QUOTENAME(COLUMN_NAME), DATA_TYPE, CHARACTER_MAXIMUM_LENGTH
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = @tableName
   AND TABLE_SCHEMA = @schemaName
   AND COLUMN_NAME = @columnName
If the returned column is not of type VARBINARY(MAX) an exception will be thrown.

SQL Injection

To avoid possibility of SQL injection all provided table and column names are quoted by utilizing the SQL Server QUOTENAME function, as shown above. All values, like the value of the primary key to stream binary data from or to, are passed to SQL Server as SqlParameter objects to avoid SQL injection and let SQL Server reuse cached execution plans.

SqlBinaryReader Class

Internal class that is responsible for streaming read access to BLOB. As already noted, it implements a System.IO.Stream to provide a well known standard behavior.
class SqlBinaryReader : Stream { //...
The most important public method the Read method, that copies data into a specified array:
public override int Read(byte[] buffer, int offset, int count) {
   // first read
   if (_buffer == null)
      ReadChunk();

   int done = 0;

   while (count != done) {
      // read buffered data into provided buffer
      done += ReadInternal(buffer, offset + done, count - done);
      // end of DB data reached
      if (_buffer.Length < _info.BufferSize)
         break;
      // read next chunk from database if needed
      if (done < count)
         ReadChunk();
   }

   return done;
}
The called method ReadInternal gets data out of the internal buffer into the specified one. The ReadChunk method handles the database access and is shown here:
private void ReadChunk() {
   SqlBinaryMetaData metaData = GetMetaData();
   // create an internal database connection if not yet available
   if (_cn == null)
      _cn = _info.CreateConnection(GetMetaData().ConnectionString);

   // create an internal data reader
   if (_reader == null) {
      string sql =
         string.Format("SELECT {0} FROM {1} WHERE {2} = @pk",
                        metaData.BinaryColumn,
                        metaData.QualifiedTableName,
                        metaData.PkColumn);
      using (var cmd = new SqlCommand(sql, _cn)) {
         cmd.Parameters.Add(_info.CreatePkParam());
         // open the reader with sequencial access behavior to enable 
         // streaming data from database
         _reader = cmd.ExecuteReader(CommandBehavior.SequentialAccess);
         _reader.Read();
      }
   }

   int read = (int)_reader.GetBytes(0, _position, _buffer, 0, _buffer.Length);
   if (read != _buffer.Length)
      Array.Resize(ref _buffer, read);
   _offset = 0;
}
The internal instance of an SqlDataReader is created with option CommandBehavior.SequentialAccess what causes ADO.NET to stream data out of SQL Server instead of pulling all data in one chunk. The reader and its parent connection are cleared from memory when the stream instance becomes disposed.

Data will always be streamed with the specified buffer size, even if the caller requests a larger count of bytes in one call. This is to avoid a too high memory utilization on server side. So it is still possible to get 10MB of data in one single call of Read method, even if this might cause 10 database calls if buffer size is specified for 1MB.

SqlBinaryWriter Class

Internal class that is responsible for streaming write access to a VARBINARY(MAX) column of a tables row. As well as the SqlBinaryReader it implements a System.IO.Stream.
class SqlBinaryWriter : Stream {
The two important public methods of the write stream are Write and Flush method.

The Write method streams the provided data in chunks into the internal buffer. Whenever the internal buffer is full, it calls the Flush method which is responsible to writing the current chunk of data to the server.
public override void Write(byte[] buffer, int offset, int count) {
   if (_failedState)
      throw new InvalidOperationException("Stream is in a failed state");

   int done = 0;
   while (done != count) {
      int chunk = Math.Min(_internalBuffer.Length - _internalOffset, count - done);
      // push a chunk of bytes into the internal buffer
      Array.Copy(buffer, offset + done, _internalBuffer, _internalOffset, chunk);
      _internalOffset += chunk;
      // if internal buffer is full, flush to database
      if (_internalOffset == _internalBuffer.Length)
         Flush();
      done += chunk;
   }
}
To avoid unneeded memory copying on server side new data are appended to the database field by using the VARBINARY(MAX).WRITE method (see UPDATE (Transact SQL)) which performs a partial update of existing data instead of a complete reallocation of all data.

public override void Flush() {
   if (_internalOffset == 0)
      return;
   if (_failedState)
      return;

   SqlBinaryMetaData metaData = GetMetaData();

   using (SqlConnection cn = _info.CreateConnection(metaData.ConnectionString))
   using (var tran = cn.BeginTransaction()) {
      try {
         // handle NULL value and "append" configuration
         PrepareValue(cn, tran);
         // UPDATE SchemaName.TableName 
         // SET BinaryColumn.WRITE(@buffer, @offset, @count) 
         // WHERE PkColumn = @pk
         string sql =
            string.Format("UPDATE {0} SET {1}.WRITE(@buffer, @offset, @count) WHERE {2} = @pk",
                          metaData.QualifiedTableName,
                          metaData.BinaryColumn,
                          metaData.PkColumn);
         using (var cmd = new SqlCommand(sql, cn)) {
            cmd.Transaction = tran;

            var bufferParam = cmd.Parameters.Add("@buffer", SqlDbType.VarBinary, _info.BufferSize);
            var offsetParam = cmd.Parameters.Add("@offset", SqlDbType.Int);
            var countParam = cmd.Parameters.Add("@count", SqlDbType.Int);
            cmd.Parameters.Add(_info.CreatePkParam());

            byte[] buffer;
            if (_internalOffset == _internalBuffer.Length)
               buffer = _internalBuffer;
            else {
               // avoid bumping not needed data over network
               buffer = new byte[_internalOffset];
               Array.Copy(_internalBuffer, buffer, _internalOffset);
            }

            bufferParam.Value = buffer;
            // VARBINARY(MAX).WRITE works with a zero based index
            offsetParam.Value = _position;
            countParam.Value = _internalOffset;
            // write chunk
            int affected = cmd.ExecuteNonQuery();
            _info.AssertOneRowAffected(affected);
            _position += _internalOffset;
            _internalOffset = 0;
         }
         tran.Commit();
      }
      catch {
         _failedState = true;
         tran.Rollback();
         throw;
      }
   }
}
The PrepareValue method, called from Flush handles the preparation of the database field for the first call. If the stream was created with option "append" false it resets any existing data to initial value "0x". If the stream was created with "append" true it determines the current length of the data in the binary field or sets it to "0x", if its value is currently NULL.

How To Use

Here is an example that shows how to use the class to streaming write and read data.
SqlBinaryData data = 
   SqlBinaryData.CreateLongPrimaryKey(ConnectionString, "TestBlob", "Data", 3L, 5);

byte[] expected = Guid.NewGuid().ToByteArray();
using (var writer = data.OpenWrite(false)) {
   writer.Write(expected, 0, 4);
   writer.Write(expected, 4, expected.Length - 4);
}

byte[] actual = new byte[expected.Length];
using (var reader = data.OpenRead()) {
   reader.Read(actual, 0, 3);
   reader.Read(actual, 3, actual.Length - 3);
}

Assert.IsTrue(expected.SequenceEqual(actual));

To create a new instance of a streaming providing SqlBinaryData class you can use the constructor. The parameter pkParam is used as specification of the primary key column.
public SqlBinaryData(string connectionString, string tableName, string tableSchema,
                     string binaryColumn, SqlParameter pkParam, object pkValue, 
                     int bufferSize);
In case of an INT, BIGINT or UNIQUEIDENTIFIER primary key column, you can also use one of the static factory methods of SqlBinaryData (each one with and without specifying a table schema name):
public static SqlBinaryData CreateIntPrimaryKey(string connectionString, 
      string tableName, string tableSchema, string binaryColumn, int pkValue, int bufferSize);

public static SqlBinaryData CreateIntPrimaryKey(string connectionString, 
      string tableName, string binaryColumn, int pkValue, int bufferSize);

public static SqlBinaryData CreateLongPrimaryKey(string connectionString, 
      string tableName, string tableSchema, string binaryColumn, long pkValue, int bufferSize);

public static SqlBinaryData CreateLongPrimaryKey(string connectionString, 
      string tableName, string binaryColumn, long pkValue, int bufferSize);

public static SqlBinaryData CreateGuidPrimaryKey(string connectionString, 
      string tableName, string binaryColumn, Guid pkValue, int bufferSize);

public static SqlBinaryData CreateGuidPrimaryKey(string connectionString, 
      string tableName, string tableSchema, string binaryColumn, Guid pkValue, int bufferSize);

Transactions

Since the classes create their database connections inside of the class you cannot directly use a SqlTransaction to scope your DML operations. However, since the ADO.NET data provider for SQL Server supports transaction scoping it is still possible to ensure an isolated processing by using the System.Transactions.TransactionScope.
using (TransactionScope tran = new TransactionScope()) {
   using (var cn = CreateConnection()) {
      // do stuff
   }

   var binaryData = CreateBinary(10L, 16);
   using (var stream = binaryData.OpenWrite(false)) {
      byte[] toWrite = Guid.NewGuid().ToByteArray();
      stream.Write(toWrite, 0, toWrite.Length);
   }

   using (var cn = CreateConnection()) {
      // do more stuff
   }
}

Server Side Resource Allocation

Since we can specify the exact buffer size to work with and chunks are always taken in this size we know that we are fine on client side. The other side to look at is what resources are allocated on server side. We will do this by imitating the classes behavior in SQL Server Management studio and checking results in SQL Server Profiler.

Here is the setup up for the following tests.
SET NOCOUNT ON;

IF (OBJECT_ID('TestBlob') IS NOT NULL)
   DROP TABLE TestBlob;

CREATE TABLE TestBlob (
   Id INT NOT NULL PRIMARY KEY CLUSTERED
   ,Data VARBINARY(MAX)
);

INSERT INTO TestBlob VALUES (1, 0x);
INSERT INTO TestBlob VALUES (2, 0x);
GO
First, let's UPDATE a row, with 50MB of binary data in one single batch.
-- one big update with 50MB of data
DECLARE @data VARBINARY(MAX) = 
   CONVERT(VARBINARY(MAX), REPLICATE(CONVERT(VARBINARY(MAX), NEWID()), 65536 * 50));

UPDATE TestBlob SET Data = @data WHERE Id = 1;
A look into SQL Server Profiler shows, as expected, a huge resource allocation.

Now, let's get the same amount of data into SQL Server by using the WRITE method and sending in 10 chunks of 5MB. To keep the test valid we have to remember the current position to write at. To keep this information available over the scope of a batch we can use the the CONTEXT_INFO() which allows to store up to 128 bytes of custom information for the current session.

SET CONTEXT_INFO 0x; -- clear the context
GO
-- 10 updates, each with 5MB
DECLARE @offset INT = 
   ISNULL(CONVERT(INT, SUBSTRING(CONTEXT_INFO(), 1, 4)), 0);
--PRINT @offset;
DECLARE @data VARBINARY(MAX) = 
   CONVERT(VARBINARY(MAX), REPLICATE(CONVERT(VARBINARY(MAX), NEWID()), 65536 * 5));
DECLARE @data_length INT = DATALENGTH(@data)

UPDATE TestBlob SET Data.WRITE(@data, @offset, @data_length) WHERE Id = 2;
SET @offset += @data_length;

SET CONTEXT_INFO @offset;
GO 10 -- count of batch execution loops
Another look into Profiler shows, we get 10 statements with a really nice resource allocation.

Unfortunately I cannot provide a valid test result for read operations using the SqlDataReader.GetBytes method at the moment. This is caused by an issue of SQL Server Profiler when trying to trace BLOB data (see MS Connect MSFT:EDW - Profiler displays "Trace Skipped Records" for large (>530Kb) batch). I don't want to show test results with too small binary data since measuring inaccuracy might be too high. If you can tell me a way of how to get valid test results for this I would be happy if you let me know.

Restrictions

The current version of this class supports only single column primary keys. An exception will be thrown if a primary key consists of more than one column. Let me know, if there is a need of a version that supports composite primary keys.

It is required that the table containing the binary data has a primary key, the class doesn't work with heap tables. An exception is thrown if an accessed table does not have a primary key.

The class supports only columns from type VARBINARY(MAX). If there is a need for a VARCHAR(MAX)/NVARCHAR(MAX) based version, let me know.

The class does not handle any possible concurrent client sessions to avoid unwanted database locks. If data, accessed by a current instance of a read or write stream become modified by another user, the processed data will become most likely corrupted on client or even server side. To secure write operations use a TransactionScope surrounding the writing stream.

IBinaryData Interface

The SqlBinaryData class implements an interface called IBinaryData. This interface is not required for using this class, but I use it for unit testing purposes in business layers of a project where it allows me to work with files of a directory instead of data from a database. Feel free to remove it.

Attachments

Here you can find ZIP archive containing all classes of SqlBinaryData and a NUnit based test class.

3 comments:

  1. Thanks very much. This really helped. It sucks that there's no streaming interface for BLOBs in ADO.NET, something that JDBC provided a long time ago.

    ReplyDelete
  2. Thanks for your Feedback. Yes, it's a bit annoying that .NET does not provide such kind of a streaming interface out of the box.
    Flo

    ReplyDelete
  3. Thanks for this, you've given me a workaround for saving BLOB by chunks in VarBinary(MAX)...

    ReplyDelete