MSSQLWIKI

Karthick P.K on SQL Server

Multi Threaded OVELAPPED and Nonbuffered I/O Example

Posted by Karthick P.K on March 4, 2012

How to Read file using Multiple threads OVERLAPPED and Nonbuffered I/O

Multithreaded Overlapped I/O and Nonbuffered I/O example

Nonbuffered I/O :Allows application to bypass the Windows cache manager or disable system caching of data being read from or written to the file .So there is no intermediate buffer or cache and gives direct control over data I/O buffering to application.

Things to remember:

1. When we use non buffered I/O (FILE_FLAG_NO_BUFFERING in createfile) reads and writes has to be in multiple of bytes per sector.

2. GetDiskFreeSpace-Retrieves information about the specified disk, including Bytes per sector (dwBytesPerSector) of disk.

3. When using non buffered I/O (FILE_FLAG_NO_BUFFERING) file offset in the OVERLAPPED structure in Readfile/Writefile if specified, must be number of bytes that is an integer multiple of the Bytes per sector

4. Buffers used for read and write operations should be physical sector-aligned, which means aligned on addresses in memory that are integer multiples of Bytes per sector.

5. GetFileSizeEx-Retrieves the size of the specified file. When multiple threads are used to read or write file. Each thread needs to have its own overlapped structure and Event has to be created for hEvent member of each overlapped structure.

If the hEvent member of the OVERLAPPED structure is NULL, the system uses the state of the hFile handle to signal when the operation has been completed. This will cause confusion when multiple threads are using same file handle to read or write file.

It is safer to use an event object because of the confusion that can occur when multiple simultaneous overlapped operations are performed on the same file, named pipe, or communications device.In this situation, there is no way to know which operation caused the object’s state to be signaled.

#include "windows.h"
#include "stdlib.h"
#include <windows.h>
#include <string>
#include <winbase.h>
#include <iostream>
using namespace std;
#include <psapi.h>
#pragma comment(lib,"psapi.lib")
#include <time.h>

#define ThreadPerProc 1 //If you want more than 1 thread per processor increase this value.

int Dootherwork();

struct SUreadfile
{
int x;
DWORDLONG  dwOffset;
DWORDLONG  dwTotalBytespostedNOffset;

};
  SUreadfile *PSUreadfile;
  int Ureadfile(SUreadfile *PSUreadfile1);

  int  x=0;
  int  i=0;
  SYSTEM_INFO si;
  DWORD dwSectorsPerCluster;
  DWORD dwBytesPerSector;
  DWORD dwNumberOfFreeClusters;
  DWORD dwTotalNumberOfClusters;

  //
  LONGLONG BuffSize=0;
  OVERLAPPED *iAIO=NULL;
  DWORDLONG  dwTotalBytesposted=0;
  DWORDLONG  dwTotalBytespostedwithinoffset=0;
  HANDLE hIFile;
  HANDLE hOFile;
  HANDLE *hEvent;
 char dateStr [9];
 char timeStr [9];

int main(int argc, char* argv[])
{

  _strdate( dateStr);
  _strtime( timeStr );
  printf("\n Date:%s\t", dateStr);
  printf("Time is%s\t", timeStr);
  if (argc<3)
  {
		printf("Usage is: To stimulate asynch I/O. Accept two parameters Source file and Destination file");
		return 1;
  }

 //Open source file
 hIFile=CreateFile((LPCSTR)argv[1],GENERIC_READ,FILE_SHARE_READ,NULL,OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL
                           | FILE_FLAG_OVERLAPPED
                           | FILE_FLAG_NO_BUFFERING
                           ,NULL); //FILE_FLAG_OVERLAPPED  - to process input or output asynchronously
								   //FILE_FLAG_NO_BUFFERING-The file or device is being opened with no system caching for data reads and writes.
 if (INVALID_HANDLE_VALUE==hIFile)
		  {
			printf("Unable to open file %s.  Error=%d\n",argv[1], GetLastError());
			return 1;
		  }
//Create destination file
 hOFile=CreateFile((LPCSTR)argv[2],GENERIC_WRITE,FILE_SHARE_READ,NULL,CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL,NULL);
	 if (INVALID_HANDLE_VALUE==hOFile)
		  {
			printf("Unable to create file %s.  Error=%d\n",argv[1], GetLastError());
			return 1;
		  }

//GetSystemInfo- Retrieves information about the current system. Output is pointer to SYSTEM_INFO structure
 GetSystemInfo(&si);
 /*GetDiskFreeSpace-Retrieves information about the specified disk, including dwBytesPerSector . When using FILE_FLAG_NO_BUFFERING
 file offset in the OVERLAPPED structure and nNumberOfBytesToRead in Readfile
 if specified, must be   number of bytes that is an integer multiple of the dwBytesPerSector*/
 GetDiskFreeSpace(NULL,&dwSectorsPerCluster,&dwBytesPerSector,&dwNumberOfFreeClusters,&dwTotalNumberOfClusters);
 LARGE_INTEGER *fsize;
 fsize =new LARGE_INTEGER;

 dwBytesPerSector=dwBytesPerSector*4; //for larger files if you want to read data greater then dwBytesPerSector in one shot increase the size of dwBytesPerSector.

//GetFileSizeEx-Retrieves the size of the specified file.

 GetFileSizeEx (hIFile,fsize);
 if (fsize->QuadPart==0)
      {
      printf("\nUnable to get the size of file. Error:%d ",GetLastError());
      return 1;
      }
    else
      {
          printf("\nFile size is: %lld Bytes",fsize->QuadPart);

		  if (fsize->QuadPart > dwBytesPerSector)
          {
			  /*Later in this program we create 1 thread per processor and read the files parellely.
			  Below logic is used to calculate the bytes each thread will read
			  remember when we use non buffered I/O (FILE_FLAG_NO_BUFFERING) reads has to be in multiple of bytes per sector. So no of bytes each thread
			  will read has to be rouded to   bytes per sector
			  */

			  BuffSize=  fsize->QuadPart +  ( (si.dwNumberOfProcessors*dwBytesPerSector)  -  (fsize->QuadPart % (si.dwNumberOfProcessors*dwBytesPerSector))  );
			  BuffSize=BuffSize/si.dwNumberOfProcessors;
          }
          else
          {
              BuffSize=fsize->QuadPart; //If the file size is less then dwBytesPerSector there is no nead to create multiple threads
          }
      }

	iAIO =new OVERLAPPED[si.dwNumberOfProcessors*ThreadPerProc];// Create overlapped structure for each thread
	HANDLE *h;
    h = new HANDLE[(si.dwNumberOfProcessors*ThreadPerProc)]; // Pointer  array to hold thread handles
	PSUreadfile= new SUreadfile[256];
	int OffsetHigh=0;
	hEvent=new HANDLE[(si.dwNumberOfProcessors*ThreadPerProc)];
	/*Create event for hEvent member of each overlapped structure If the hEvent member of the OVERLAPPED structure is NULL, the system
	uses the state of the hFile handle to signal when the operation has been completed.
	This is will cause confusion when multiple threads are using same file handle to read or write file.
	It is safer to use an event object because of the
	confusion that can occur when multiple simultaneous overlapped operations are performed on the same file, named pipe, or communications device.
	In this situation, there is no way to know which operation caused the object's state to be signaled.*/

	//Create threads and and assign the starting file offset to each thread for read operation
	for ( i=0;i<(si.dwNumberOfProcessors*ThreadPerProc);i++)
			{
			ZeroMemory(&PSUreadfile[i],sizeof(PSUreadfile[i]));
 			PSUreadfile[i].x=i;
			ZeroMemory (&iAIO[i],sizeof(iAIO[i]));
			iAIO[i].Internal=0;
			iAIO[i].InternalHigh=0;
			iAIO[i].Offset=0;
			iAIO[i].OffsetHigh=0;
			iAIO[i].Pointer=0;
			iAIO[i].Offset= 0;
			PSUreadfile[i].dwOffset=dwTotalBytesposted;
			hEvent[i] = CreateEvent(NULL, TRUE, FALSE, NULL);
			if(hEvent)
			{
			 iAIO[i].hEvent = hEvent[i];
			}
			 else
			{
			 printf("\nCreate event failed with error:%d",GetLastError());
			}

			h[i]=CreateThread(0,0,(LPTHREAD_START_ROUTINE  )Ureadfile,(LPVOID)&PSUreadfile[i],  0,  NULL);

			if (!h[i])
			{
			printf("Thread creation failure :", GetLastError());
			}

			dwTotalBytesposted=dwTotalBytesposted+BuffSize;

		}

	WaitForMultipleObjects((si.dwNumberOfProcessors), h,TRUE,INFINITE);  // Wait for all the threads to complete
  _strdate( dateStr);
  _strtime( timeStr );
  printf("\n Date:%s\t", dateStr);
  printf("Time is%s\t", timeStr);

}

int Ureadfile(SUreadfile *PSUreadfile1)
{

	int z=PSUreadfile1->x;
	SIZE_T dwSize=dwBytesPerSector;
	if (PSUreadfile[z].dwOffset>MAXDWORD)
	{
		iAIO[z].Offset=(PSUreadfile[z].dwOffset%MAXDWORD)-(PSUreadfile[z].dwOffset/MAXDWORD); //Offset start from zero
		iAIO[z].OffsetHigh=PSUreadfile[z].dwOffset/MAXDWORD;

	/*iAIO[z].Offset (Offset of overlapped structure) is the low-order portion of the file position at which to start the I/O request.
	Data type of Offset is DWORD so the 	maximum size is 2^32 (4294967296-1). If you want to read the file from offset
	which is greater than 4294967296 bytes increase the OffsetHigh */
	}
	else
	{
	iAIO[z].Offset=PSUreadfile[z].dwOffset;
	}

	DWORD dwBytesRead=0;
	LONGLONG dwTotalBytesRead=0;
	BOOL	RF;
	BOOL	WF;
	wchar_t *IBuffer = (wchar_t *) VirtualAlloc(NULL,  dwSize, MEM_RESERVE| MEM_COMMIT,PAGE_READWRITE);

	 while (BuffSize>dwTotalBytesRead)
	{

		RF=0;

		RF=ReadFile(hIFile,IBuffer,dwSize, &dwBytesRead,&iAIO[z]); // pass  a pointer to an OVERLAPPED structure (iAIO)

		if ((RF==0) && GetLastError()==997)      //ERROR_IO_PENDING                 997L
        {

			/*bWait parameter of GetOverlappedResult can be set to TRUE or FALSE.
			If this parameter is TRUE, and the Internal member of the lpOverlapped structure is STATUS_PENDING, the function
			does not return until the operation has been completed. If this parameter is FALSE and the operation is still pending, the function
			returns FALSE and the GetLastError function returns ERROR_IO_INCOMPLETE	*/
            while( !GetOverlappedResult( hIFile,&iAIO[z],&dwBytesRead,TRUE))
                {

                    if (GetLastError()==996)//ERROR_IO_INCOMPLETE  (Not signaled)            996L
                    {
                    printf("\nI/O pending: %d .",GetLastError());
                    //Dootherwork();
					/*If the bWait parameter of GetOverlappedResult is set to false thread can do other work while the I/O is progressing.
					Change the bWait parameter to FALSE in GetOverlappedResult and un-comment Dootherwork function above to stimulate clean overlapped I/O.
					*/
                    }
                    else if  (GetLastError()==38) //ERROR_HANDLE_EOF                 38L
                    {
                    printf("\nEnd of file reached.");
                    break;
                    }
                    else
                    {
                    printf("GetOverlappedResult failed with error:%d,Offset:%d",GetLastError(),iAIO[z].Offset);
                    break;
                    }

                }

        }
        else if ((RF==0)  && GetLastError()!=997 &&   GetLastError()!=38 )
        {
            printf ("\nError reading file :%d offset-%d",GetLastError(),iAIO[z].Offset);
            return 0;
        }

		else if ((RF==0)  && GetLastError()==38 )
        {
            printf ("\nEnd of file reached file :%d offset:%d",GetLastError(),iAIO[z].Offset);
            return 0;
        }

		WF= WriteFile(hOFile,IBuffer,iAIO[z].InternalHigh,NULL,&iAIO[z]);  //Write the buffers which we read to new file.

		if (!WF)
		 {
			printf("\nWrite file operation failed. Error:%d",GetLastError());
		 }

	    dwTotalBytesRead=dwTotalBytesRead + iAIO[z].InternalHigh;

		//Increase the offset if we hit the max DWORD limitation
		if (iAIO[z].Offset+iAIO[z].InternalHigh > (MAXDWORD* (iAIO[z].OffsetHigh+1)))
		{
printf("\nThread Id-%d completed ReadFile operation from %lld till  %lld of %lld bytes",z,(iAIO[z].Offset-dwTotalBytesRead-iAIO[z].InternalHigh),iAIO[z].Offset+iAIO[z].InternalHigh,dwTotalBytesRead);
					iAIO[z].OffsetHigh++;
			        iAIO[z].Offset= (iAIO[z].Offset + iAIO[z].InternalHigh-MAXDWORD-iAIO[z].OffsetHigh); //Offset starts from zero
		}
		else
		{
		iAIO[z].Offset=iAIO[z].Offset+iAIO[z].InternalHigh;
		}

		if (dwSize >iAIO[z].InternalHigh)
		{
		  printf("\nEnd of file reached %ld-%ld",iAIO[z].InternalHigh,dwSize);
		 break;
		}

	}

	printf("\nThread Id:%d completed ReadFile operation  of %lld bytes",z,dwTotalBytesRead);
  return 1;
}

int Dootherwork()
{

	x=x+1;
	printf("\nWe are doing other work when overlapped I/O read is in progress-%d -Sleeping for 1 Milli second",x);
	Sleep(1);
	return 0;
}

About these ads

3 Responses to “Multi Threaded OVELAPPED and Nonbuffered I/O Example”

  1. Appreciating the time and energy you put into your site and detailed information you provide.

    It’s good to come across a blog every once in
    a while that isn’t the same old rehashed information.
    Wonderful read! I’ve bookmarked your site and I’m adding your RSS feeds to my Google account.

  2. […] Multi Threaded OVELAPPED and Nonbuffered I/O Example […]

  3. Excellent post. I used to be checking continuously this blog
    and I am inspired! Very helpful information specially the
    final section :) I deal with such information a lot.

    I was looking for this particular info for a very lengthy time.
    Thank you and good luck.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

 
Follow

Get every new post delivered to your Inbox.

Join 2,107 other followers

%d bloggers like this: