目录

 

五、CLR线程池的I/O线程

在前一节所介绍的线程都属于CLR线程池的工作者线程,这一节开始为大家介绍一下CLR线程池的I/O线程

I/O 线程是.NET专为访问外部资源所设置的一种线程,因为访问外部资源常常要受到外界因素的影响,为了防止让主线程受影响而长期处于阻塞状态,.NET为多 个I/O操作都建立起了异步方法,例如:FileStream、TCP/IP、WebRequest、WebService等等,而且每个异步方法的使用 方式都非常类似,都是以BeginXXX为开始,以EndXXX结束,下面为大家一一解说。

 

5.1  异步读写 FileStream

需要在 FileStream 异步调用 I/O线程,必须使用以下构造函数建立 FileStream 对象,并把useAsync设置为 true。

FileStream stream = new FileStream ( string path, FileMode mode, FileAccess access, FileShare share, int bufferSize,bool useAsync ) ;

其中 path 是文件的相对路径或绝对路径; mode 确定如何打开或创建文件; access 确定访问文件的方式; share 确定文件如何进程共享; bufferSize 是代表缓冲区大小,一般默认最小值为8,在启动异步读取或写入时,文件大小一般大于缓冲大小; userAsync代表是否启动异步I/O线程。

注意:当使用 BeginRead 和 BeginWrite 方法在执行大量读或写时效果更好,但对于少量的读/写,这些方法速度可能比同步读取还要慢,因为进行线程间的切换需要大量时间。

 

5.1.1 异步写入

FileStream中包含BeginWrite、EndWrite 方法可以启动I/O线程进行异步写入。

public override IAsyncResult BeginWrite ( byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject )

public override void EndWrite (IAsyncResult asyncResult )

 

BeginWrite 返回值为IAsyncResult, 使用方式与委托的BeginInvoke方法相似,最好就是使用回调函数,避免线程阻塞。在最后两个参数中,参数AsyncCallback用于绑定回调 函数; 参数Object用于传递外部数据。要注意一点:AsyncCallback所绑定的回调函数必须是带单个 IAsyncResult 参数的无返回值方法。

在例子中,把FileStream作为外部数据传递到回调函数当中,然后在回调函数中利用IAsyncResult.AsyncState获取FileStream对象,最后通过FileStream.EndWrite(IAsyncResult)结束写入。

class Program     {         static void Main(string[] args)         {             //把线程池的最大值设置为1000              ThreadPool.SetMaxThreads(1000, 1000);             ThreadPoolMessage("Start");              //新立文件File.sour              FileStream stream = new FileStream("File.sour", FileMode.OpenOrCreate,                                         FileAccess.ReadWrite,FileShare.ReadWrite,1024,true);             byte[] bytes = new byte[16384];             string message = "An operating-system ThreadId has no fixed relationship........";             bytes = Encoding.Unicode.GetBytes(message);              //启动异步写入              stream.BeginWrite(bytes, 0, (int)bytes.Length,new AsyncCallback(Callback),stream);             stream.Flush();                          Console.ReadKey();         }          static void Callback(IAsyncResult result)         {             //显示线程池现状              Thread.Sleep(200);             ThreadPoolMessage("AsyncCallback");             //结束异步写入              FileStream stream = (FileStream)result.AsyncState;             stream.EndWrite(result);             stream.Close();         }          //显示线程池现状          static void ThreadPoolMessage(string data)         {             int a, b;             ThreadPool.GetAvailableThreads(out a, out b);             string message = string.Format("{0}\n  CurrentThreadId is {1}\n  "+                   "WorkerThreads is:{2}  CompletionPortThreads is :{3}",                   data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());             Console.WriteLine(message);         }     }

由输出结果可以看到,在使用FileStream.BeginWrite方法后,系统将自动启动CLR线程池中I/O线程。

 

5.1.2 异步读取

FileStream 中包含 BeginRead 与 EndRead 可以异步调用I/O线程进行读取。

public override IAsyncResult BeginRead ( byte[] array,int offset,int numBytes, AsyncCallback userCallback,Object stateObject)

public override int EndRead(IAsyncResult asyncResult)

 

其使用方式与BeginWrite和EndWrite相似,AsyncCallback用于绑定回调函数; Object用于传递外部数据。在回调函数只需要使用IAsyncResut.AsyncState就可获取外部数据。EndWrite 方法会返回从流读取到的字节数量。

首先定义 FileData 类,里面包含FileStream对象,byte[] 数组和长度。然后把FileData对象作为外部数据传到回调函数,在回调函数中,把IAsyncResult.AsyncState强制转换为 FileData,然后通过FileStream.EndRead(IAsyncResult)结束读取。最后比较一下长度,若读取到的长度与输入的数据 长度不一至,则抛出异常。

1      class Program 2      { 3          public class FileData 4          { 5              public FileStream Stream; 6              public int Length; 7              public byte[] ByteData; 8          } 9  10          static void Main(string[] args)11          {       12              //把线程池的最大值设置为1000 13              ThreadPool.SetMaxThreads(1000, 1000);14              ThreadPoolMessage("Start");15              ReadFile();16  17              Console.ReadKey();18          }19  20          static void ReadFile()21          {22              byte[] byteData=new byte[80961024];23              FileStream stream = new FileStream("File1.sour", FileMode.OpenOrCreate, 24                                      FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true);25              26              //把FileStream对象,byte[]对象,长度等有关数据绑定到FileData对象中,以附带属性方式送到回调函数 27              FileData fileData = new FileData();28              fileData.Stream = stream;29              fileData.Length = (int)stream.Length;30              fileData.ByteData = byteData;31              32              //启动异步读取 33              stream.BeginRead(byteData, 0, fileData.Length, new AsyncCallback(Completed), fileData);34          }35   36          static void Completed(IAsyncResult result)37          {38              ThreadPoolMessage("Completed");39  40              //把AsyncResult.AsyncState转换为FileData对象,以FileStream.EndRead完成异步读取 41              FileData fileData = (FileData)result.AsyncState;42              int length=fileData.Stream.EndRead(result);43              fileData.Stream.Close();44  45              //如果读取到的长度与输入长度不一致,则抛出异常 46              if (length != fileData.Length)47                  throw new Exception("Stream is not complete!");48  49              string data=Encoding.ASCII.GetString(fileData.ByteData, 0, fileData.Length);50              Console.WriteLine(data.Substring(2,22));51          }52  53          //显示线程池现状 54          static void ThreadPoolMessage(string data)55          {56              int a, b;57              ThreadPool.GetAvailableThreads(out a, out b);58              string message = string.Format("{0}\n  CurrentThreadId is {1}\n  "+59                           "WorkerThreads is:{2}  CompletionPortThreads is :{3}",60                           data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());61              Console.WriteLine(message);      62          }63              64    }

由输出结果可以看到,在使用FileStream.BeginRead方法后,系统将自动启动CLR线程池中I/O线程。

 

注意:如果你看到的测试结果正好相反:工作者线程为999,I/O线程为1000,这是因为FileStream的文件容量小于缓冲值1024所致的。此时文件将会一次性读取或写入,而系统将启动工作者线程而非I/O线程来处理回调函数。

 

 

5.2 异步操作TCP/IP套接字

在介绍 TCP/IP 套接字前先简单介绍一下 NetworkStream 类,它是用于网络访问的基础数据流。 NetworkStream 提供了好几个方法控制套接字数据的发送与接收, 其中BeginRead、EndRead、BeginWrite、EndWrite 能够实现异步操作,而且异步线程是来自于CLR线程池的I/O线程。

public override int ReadByte ()

public override int Read (byte[] buffer,int offset, int size)

public override void WriteByte (byte value)

public override void Write (byte[] buffer,int offset, int size)

public override IAsyncResult BeginRead (byte [] buffer, int offset, int size,  AsyncCallback callback, Object state )

public override int EndRead(IAsyncResult result)

public override IAsyncResult BeginWrite (byte [] buffer, int offset, int size,  AsyncCallback callback, Object state )

public override void EndWrite(IAsyncResult result)

 

若要创建 NetworkStream,必须提供已连接的 Socket。而在.NET中使用TCP/IP套接字不需要直接与Socket打交道,因为.NET把Socket的大部分操作都放在 System.Net.TcpListener和System.Net.Sockets.TcpClient里面,这两个类大大地简化了Socket的操 作。一般套接字对象Socket包含一个Accept()方法,此方法能产生阻塞来等待客户端的请求,而在TcpListener类里也包含了一个相似的 方法 public TcpClient AcceptTcpClient()用于等待客户端的请求。此方法将会返回一个TcpClient 对象,通过 TcpClient 的 public NetworkStream GetStream()方法就能获取NetworkStream对象,控制套接字数据的发送与接收。

 

下面以一个例子说明异步调用TCP/IP套接字收发数据的过程。

首先在服务器端建立默认地址127.0.0.1用于收发信息,使用此地址与端口500新建TcpListener对象,调用TcpListener.Start 侦听传入的连接请求,再使用一个死循环来监听信息。

在ChatClient类包括有接收信息与发送信息两个功能:当接收到客户端请求时,它会利 用 NetworkStream.BeginRead 读取客户端信息,并在回调函数ReceiveAsyncCallback中输出信息内容,若接收到的信息的大小小于1时,它将会抛出一个异常。当信息成功 接收后,再使用 NetworkStream.BeginWrite 方法回馈信息到客户端

class Program     {         static void Main(string[] args)         {             //设置CLR线程池最大线程数              ThreadPool.SetMaxThreads(1000, 1000);                 //默认地址为127.0.0.1              IPAddress ipAddress = IPAddress.Parse("127.0.0.1");             TcpListener tcpListener = new TcpListener(ipAddress, 500);             tcpListener.Start();                          //以一个死循环来实现监听              while (true)             {   //调用一个ChatClient对象来实现监听                  ChatClient chatClient = new ChatClient(tcpListener.AcceptTcpClient());                 }         }     }      public class ChatClient     {         static TcpClient tcpClient;         static byte[] byteMessage;         static string clientEndPoint;          public ChatClient(TcpClient tcpClient1)         {             tcpClient = tcpClient1;             byteMessage = new byte[tcpClient.ReceiveBufferSize];                         //显示客户端信息              clientEndPoint = tcpClient.Client.RemoteEndPoint.ToString();             Console.WriteLine("Client's endpoint is " + clientEndPoint);                          //使用NetworkStream.BeginRead异步读取信息              NetworkStream networkStream = tcpClient.GetStream();             networkStream.BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize,                                          new AsyncCallback(ReceiveAsyncCallback), null);         }          public void ReceiveAsyncCallback(IAsyncResult iAsyncResult)         {             //显示CLR线程池状态              Thread.Sleep(100);             ThreadPoolMessage("\nMessage is receiving");              //使用NetworkStream.EndRead结束异步读取              NetworkStream networkStreamRead = tcpClient.GetStream();             int length=networkStreamRead.EndRead(iAsyncResult);              //如果接收到的数据长度少于1则抛出异常              if (length < 1)             {                 tcpClient.GetStream().Close();                 throw new Exception("Disconnection!");             }              //显示接收信息              string message = Encoding.UTF8.GetString(byteMessage, 0, length);             Console.WriteLine("Message:" + message);              //使用NetworkStream.BeginWrite异步发送信息              byte[] sendMessage = Encoding.UTF8.GetBytes("Message is received!");             NetworkStream networkStreamWrite=tcpClient.GetStream();             networkStreamWrite.BeginWrite(sendMessage, 0, sendMessage.Length,                                              new AsyncCallback(SendAsyncCallback), null);         }          //把信息转换成二进制数据,然后发送到客户端          public void SendAsyncCallback(IAsyncResult iAsyncResult)         {             //显示CLR线程池状态              Thread.Sleep(100);             ThreadPoolMessage("\nMessage is sending");              //使用NetworkStream.EndWrite结束异步发送              tcpClient.GetStream().EndWrite(iAsyncResult);              //重新监听              tcpClient.GetStream().BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize,                                                new AsyncCallback(ReceiveAsyncCallback), null);         }          //显示线程池现状          static void ThreadPoolMessage(string data)         {             int a, b;             ThreadPool.GetAvailableThreads(out a, out b);             string message = string.Format("{0}\n  CurrentThreadId is {1}\n  " +                   "WorkerThreads is:{2}  CompletionPortThreads is :{3}\n",                   data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());              Console.WriteLine(message);         }     }

而在客户端只是使用简单的开发方式,利用TcpClient连接到服务器端,然后调用NetworkStream.Write方法发送信息,最后调用NetworkStream.Read方法读取回馈信息

1         static void Main(string[] args) 2         { 3             //连接服务端  4             TcpClient tcpClient = new TcpClient("127.0.0.1", 500); 5  6             //发送信息  7             NetworkStream networkStream = tcpClient.GetStream(); 8             byte[] sendMessage = Encoding.UTF8.GetBytes("Client request connection!"); 9             networkStream.Write(sendMessage, 0, sendMessage.Length);10             networkStream.Flush();11 12             //接收信息 13             byte[] receiveMessage=new byte[1024];14             int count=networkStream.Read(receiveMessage, 0,1024);15             Console.WriteLine(Encoding.UTF8.GetString(receiveMessage));16             Console.ReadKey();17         }

注意观察运行结果,服务器端的异步操作线程都是来自于CLR线程池的I/O线程

 

5.3 异步WebRequest

System.Net.WebRequest 是 .NET 为实现访问 Internet 的 “请求/响应模型” 而开发的一个 abstract 基类, 它 主要有三个子类:FtpWebRequest、HttpWebRequest、FileWebRequest。当使用 WebRequest.Create(string uri)创建对象时,应用程序就可以根据请求协议判断实现类来进行操作。FileWebRequest、FtpWebRequest、 HttpWebRequest 各有其作用:FileWebRequest 使用 “file://路径” 的URI方式实现对本地资源和内部文件的请求/响应、FtpWebRequest 使用FTP文件传输协议实现文件请求/响应、HttpWebRequest 用于处理HTTP的页面请求/响应。由于使用方法相类似,下面就以常用的HttpWebRequest为例子介绍一下异步WebRequest的使用方法。

在使用ASP.NET开发网站的时候,往往会忽略了HttpWebRequest的使用,因为开发 都假设客户端是使用浏览器等工具去阅读页面的。但如果你对REST开发方式有所了解,那对 HttpWebRequest 就应该非常熟悉。它可以在路径参数、头文件、页面主体、Cookie 等多处地方加入请求条件,然后对回复数据进行适当处理。HttpWebRequest 包含有以下几个常用方法用于处理请求/响应:

public override Stream GetRequestStream ()

public override WebResponse GetResponse ()

public override IAsyncResult BeginGetRequestStream ( AsyncCallback callback, Object state )

public override Stream EndGetRequestStream ( IAsyncResult asyncResult )
public override IAsyncResult BeginGetResponse ( AsyncCallback callback, Object state )
public override WebResponse EndGetResponse ( IAsyncResult asyncResult )

其中BeginGetRequestStream、EndGetRequestStream 用于异步向HttpWebRequest对象写入请求信息;  BeginGetResponse、EndGetResponse 用于异步发送页面请求并获取返回信息。使用异步方式操作Internet的“请求/响应”,避免主线程长期处于等待状态,而操作期间异步线程是来自CLR 线程池的I/O线程。

注意:请求与响应不能使用同步与异步混合开发模式,即当请求写入使用GetRequestStream同步模式,即使响应使用BeginGetResponse异步方法,操作也与GetRequestStream方法在于同一线程内。

下面以简单的例子介绍一下异步请求的用法。

首先为Person类加上可序列化特性,在服务器端建立Hanlder.ashx,通过Request.InputStream 获取到请求数据并把数据转化为String对象,此实例中数据是以 “Id:1” 的形式实现传送的。然后根据Id查找对应的Person对象,并把Person对象写入Response.OutStream 中返还到客户端。

在客户端先把 HttpWebRequird.Method 设置为 "post",使用异步方式通过BeginGetRequireStream获取请求数据流,然后写入请求数据 “Id:1”。再使用异步方法BeginGetResponse 获取回复数据,最后把数据反序列化为Person对象显示出来。

注意:HttpWebRequire.Method默认为get,在写入请求前必须把HttpWebRequire.Method设置为post,否则在使用BeginGetRequireStream 获取请求数据流的时候,系统就会发出 “无法发送具有此谓词类型的内容正文" 的异常。

Model

namespace Model {     [Serializable]     public class Person     {         public int ID         {             get;             set;         }         public string Name         {             get;             set;         }         public int Age         {             get;             set;         }     } }

服务器端

1 public class Handler : IHttpHandler { 2  3     public void Proce***equest(HttpContext context) 4     { 5         //把信息转换为String,找出输入条件Id  6         byte[] bytes=new byte[1024]; 7         int length=context.Request.InputStream.Read(bytes,0,1024); 8         string condition = Encoding.Default.GetString(bytes); 9         int id = int.Parse(condition.Split(new string[] { ":" }, 10                            StringSplitOptions.RemoveEmptyEntries)[1]);11         12         //根据Id查找对应Person对象 13         var person = GetPersonList().Where(x => x.ID == id).First();14         15         //所Person格式化为二进制数据写入OutputStream 16         BinaryFormatter formatter = new BinaryFormatter();17         formatter.Serialize(context.Response.OutputStream, person);18     }19 20     //模拟源数据 21     private IList
GetPersonList()22 {23 var personList = new List
();24 25 var person1 = new Person();26 person1.ID = 1;27 person1.Name = "Leslie";28 person1.Age = 30;29 personList.Add(person1);30 ...........31 return personList;32 }33 34 public bool IsReusable35 {36 get { return true;}37 }38 }

客户端

class Program     {         static void Main(string[] args)         {             ThreadPool.SetMaxThreads(1000, 1000);             Request();             Console.ReadKey();         }          static void Request()         {             ThreadPoolMessage("Start");              //使用WebRequest.Create方法建立HttpWebRequest对象              HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create(                                             "http://localhost:5700/Handler.ashx");             webRequest.Method = "post";                         //对写入数据的RequestStream对象进行异步请求              IAsyncResult result=webRequest.BeginGetRequestStream(                 new AsyncCallback(EndGetRequestStream),webRequest);         }          static void EndGetRequestStream(IAsyncResult result)         {             ThreadPoolMessage("RequestStream Complete");             //获取RequestStream              HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState;             Stream stream=webRequest.EndGetRequestStream(result);              //写入请求条件              byte[] condition = Encoding.Default.GetBytes("Id:1");             stream.Write(condition, 0, condition.Length);              //异步接收回传信息              IAsyncResult responseResult = webRequest.BeginGetResponse(                 new AsyncCallback(EndGetResponse), webRequest);         }          static void EndGetResponse(IAsyncResult result)         {             //显出线程池现状              ThreadPoolMessage("GetResponse Complete");              //结束异步请求,获取结果              HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState;             WebResponse webResponse = webRequest.EndGetResponse(result);                          //把输出结果转化为Person对象              Stream stream = webResponse.GetResponseStream();             BinaryFormatter formatter = new BinaryFormatter();             var person=(Person)formatter.Deserialize(stream);             Console.WriteLine(string.Format("Person    Id:{0} Name:{1} Age:{2}",                 person.ID, person.Name, person.Age));         }          //显示线程池现状          static void ThreadPoolMessage(string data)         {             int a, b;             ThreadPool.GetAvailableThreads(out a, out b);             string message = string.Format("{0}\n  CurrentThreadId is {1}\n  " +                   "WorkerThreads is:{2}  CompletionPortThreads is :{3}\n",                   data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());              Console.WriteLine(message);         }     }

从运行结果可以看到,BeginGetRequireStream、BeginGetResponse方法是使用CLR线程池的I/O线程。

5.4 异步调用WebService

相比TCP/IP套接字,在使用WebService的时候,服务器端需要更复杂的操作处理,使用时间往往会更长。为了避免客户端长期处于等待状态,在配置服务引用时选择 “生成异步操作”,系统可以自动建立异步调用的方式。

以.NET 2.0以前,系统都是使用ASMX来设计WebService,而近年来WCF可说是火热登场,下面就以WCF为例子简单介绍一下异步调用WebService的例子。

由于系统可以自动生成异步方法,使用起来非常简单,首先在服务器端建立服务 ExampleService,里面包含方法Method。客户端引用此服务时,选择 “生成异步操作”。然后使用 BeginMethod 启动异步方法, 在回调函数中调用EndMethod结束异步调用。

服务端

1      [ServiceContract] 2      public interface IExampleService 3      { 4          [OperationContract] 5          string Method(string name); 6      } 7   8      public class ExampleService : IExampleService 9      {10          public string Method(string name)11          {12              return "Hello " + name;13          }14      }15  16      class Program17      {18          static void Main(string[] args)19          {20              ServiceHost host = new ServiceHost(typeof(ExampleService));21              host.Open();22              Console.ReadKey();23              host.Close();24           }25      }26  27  
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

客户端

class Program      {          static void Main(string[] args)          {              //设置最大线程数               ThreadPool.SetMaxThreads(1000, 1000);              ThreadPoolMessage("Start");                            //建立服务对象,异步调用服务方法               ExampleServiceReference.ExampleServiceClient exampleService = new                                      ExampleServiceReference.ExampleServiceClient();              exampleService.BeginMethod("Leslie",new AsyncCallback(AsyncCallbackMethod),                                           exampleService);                Console.ReadKey();          }            static void AsyncCallbackMethod(IAsyncResult result)          {              Thread.Sleep(1000);              ThreadPoolMessage("Complete");              ExampleServiceReference.ExampleServiceClient example =                  (ExampleServiceReference.ExampleServiceClient)result.AsyncState;              string data=example.EndMethod(result);              Console.WriteLine(data);          }            //显示线程池现状           static void ThreadPoolMessage(string data)          {              int a, b;              ThreadPool.GetAvailableThreads(out a, out b);              string message = string.Format("{0}\n  CurrentThreadId is {1}\n  " +                    "WorkerThreads is:{2}  CompletionPortThreads is :{3}\n",                    data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());                Console.WriteLine(message);          }      }    

注意观察运行结果,异步调用服务时,回调函数都是运行于CLR线程池的I/O线程当中。

对JAVA与.NET开发有兴趣的朋友欢迎加入QQ群:162338858