<%@ Page Language="VB" AutoEventWireup="false" CodeFile="accumchoice.htm.vb" Inherits="clanki_articles_accumchoice_accumchoice" %> Accumulating choice in F#

Accumulating choices in F#, the case of a distributed database

Author: Josip Zohil, Josip.zohil1@guest.arnes.si

           

Resume

In distributed relational database systems clients access the data in parallel from multiple nodes (parallel execution servers) or theirs duplication. A request to the node can have alternative responses: values, exception (node down), and other exceptions. In case a node is down, the response is an exception; we send a request to the next duplication node. The exceptions are the bases for our decisions: we accept or reject them.

In this article we present the client code in F# for parallel data retrieval from a distributed read-only relational database system using a Windows Communication Foundation (WCF). We use a three choices F# data type. The exceptions and other responses values are managed and/or accumulated asynchronously, in parallel and without locks.

           

Problem

It is known that relational database systems don't scale well using the classic client-server architecture. Some times it is possible to spread the database across different but interconnected computer systems (nodes). Each computer system (node) has a relational database manager that manages the tables in its environment. To ensure fault tolerance, in case a particular node breaks down, the data is made redundant, so each database is stored in the system multiple times (duplication nodes).

For example, annual data of last periods are normally immutable (p.e. accounting annual data). We can aggregate this periodical data (p.e. 10 annual data of 800 MB each) in one table (of 8 GB). The other possibility is to retrieve the data from 10 tables in one database or from 10 databases on the same server. An 8 GB table and its indexes are relatively large, and the time taken by the system for data retrieve is high. Having 10 (smaller) databases on one computer system also generate problems (disk access and missing cache hits).

Spreading the databases on different computers (nodes) and additional multiple duplication nodes ease the load on the database. Because of duplication, single site failure does not affect performance. We »accept« the node failure (exception) and proceeded with the next duplicated node. Extra work has to be done in managing the eventual failures (exceptions). The client task is running in parallel on all the nodes (parallel requests). We have multiple nodes and duplication nodes, so eventually we can have also multiple exceptions in one request to the distributed databases: we have to accumulate the response values and eventually the exceptions.  

The goal is to make distributed data retrieval on a large computer (node) cluster simple enough for ordinary programmers without the use of manual threading, locks, and so on. The programmer intervention is minimal and appears to the user as the databases were united.

           

Solution

Spreading the databases on multiple different computers ease some of this problems. The nodes don't interact mutually, only the client interacts with them by message passing. The client can access this nodes data running the same request in parallel on all nodes. Particular nodes can be down. To ensure fault tolerance, we duplicate each node data on multiple nodes (in our example on one additional node).

1) Normally, the client sends a request in parallel to the WCF hosts (nodes), the WCF dispatch the request to the database system, catch the response from the database (backend database), and send the message to the client, where the responses from multiple (parallel) nodes are accumulated.

2) In case a node is down (a communication or timeout exception) we send the same request (message) to the next duplication node. So the client code makes a decision based on the generated node responses (values or exceptions).

3) In case all duplication nodes are down, we inform the user about the problem.

4) In case there are exceptions, we accumulate them and generate a message for the client (user).

In the distributed database system the performance is mainly dictated by the quality of the disk and cache ratio and network bandwidth.

           

F# console application

           

In this article we will study a case where a database is spread on two nodes and two duplication nodes (the copies of the primary nodes). The number of nodes is a parameter, so you can use the programs for the arbitrary number of nodes. On every node we have the WCF host. We present the client side code as a consol application in Visual Studio 2008. For pedagogical reason we present, in the first part of the article, a F# code with the intermediate values of the functions composition, and in the last part the compact F# code  (with the minimal number of intermediate values).

In the client application, named DistrTwoAccFun, we add a reference to the WCF host dll (WcfServiceLibraryVFP.dll) (The project code and the dll's are in the download of this article). The class IService1 of this dll exposes also these WCF methods:

IService1.MyOperation1 (string) (for test), with a return value of type string,

IService1.FromStoreProc(string, string, string, string) (to access a store procedure in the database), with a returned value of type DataSet.DataTable encoded as a byte stream.

           

           

Application configuration

           

<appSettings>

    <add key="numberOfNodes" value="2"/>

           

    <add key="node1address1"      value="net.tcp://192.168.123.165:8731/Design_Time_Addresses/WcfServiceLibraryVFP/Service1/" />

    <add key="node2ondaddress1"      value="net.tcp://192.168.123.110:8731/Design_Time_Addresses/WcfServiceLibraryVFP/Service1/" />

    <add key="node1address2"      value="net.tcp://192.168.123.166:8731/Design_Time_Addresses/WcfServiceLibraryVFP/Service1/" />

    <add key="node2address2"      value="net.tcp://192.168.123.111:8731/Design_Time_Addresses/WcfServiceLibraryVFP/Service1/" />

    <add key="connStr" value="Provider=VFPOLEDB.1;Data Source=D:\moj\att2006\data;Collating Sequence=MACHINE"/>

    <add key="fileName" value="d:\moj\compose\mywcf.prg"/>

    <add key="fileSerPath" value="d:\moj\att2006\data\mywcf.prg"/>

    <add key="myNetBinding" value="NetTcpBinding_IService11"/>

</appSettings>

Figure 1. The appSetings of the app.config file

           

We will study the distributed database in a local area network using a WCF messaging system based on the net.tcp protocol. The nodes are identified by theirs IP, port and WCF service addresses (p.e. node1address1= net.tcp://192.168.123.165:8731 /Design_Time_Addresses/ WcfServiceLibraryVFP/ Service1/). The name of the second node is node1address2 and the name of its duplicated node is node2address2 (See, config.app in Figure 1).

The config.app (Figure 1) also specifies the maximum number of connections (10), the bindings (netTcpBinding), the number of nodes (2) and other key values.

           

           

#light

open System

open System.Configuration

module ConfigClient=

    let aps=ConfigurationManager.AppSettings

    let myNetBinding=aps.["myNetBinding"]

    let fileSerPath = aps.["fileSerPath"]

    let fileName =aps.["fileName"]

    let connStr = aps.["connStr"]

    let numberOfNodes = Convert.ToInt32( aps.["numberOfNodes"])

    let ge=aps.AllKeys

    let predicate x (i:int)=x.ToString() .StartsWith("node"+Convert.ToString(i))  

    let mam i=Array.FindAll(aps.AllKeys, (fun x -> (predicate x i) ))

    let nArr (i:int)= Array.map (fun (x:string) -> aps.[x]  ) ( mam  i)

    let arrOfnodeAddress=[| for i in 1..numberOfNodes do yield (nArr i) |]

Figure 2. WCFConfig.fs file

           

In Figure 2 is the code snippet that reads the configuration parameters from the config.app (Figure 1). Using the predicate we filter the key values starting with »node« characters and we transform the data to an array of arrays of node addresses. For example, the first element of the first array is the address of the first database node, the second element is the address of its first copy (duplication)...

           

Types and fundamental functions

           

#light

open System

           

module OpenWithExc=

  //three options type: opened, not opened, Exception

  type OpenedRepNodesWithExc<'T> =

    |Some  of 'T

    |None

    |Exc of Exception

    //print the results and eventually the duplication nodes array x

    member this.print_value x=

       match this with

       | Some a  -> printfn "This node (from the array) is opened: %A" a

       | None  -> printfn "All nodes are down: %A" x

       | Exc ex  -> printfn "Exception- no node opened: %A" (ex.Message)

 

  //reduce the results from three options to two

  let transformResult x  =

      match x with

      |Some x ->   x

      |None ->  raise (System.Exception ("No node opened ") )

      |Exc ex -> raise (System.Exception (ex.Message ))

  //try find the first node which suits the function (the predicate)

  let tryFindInArrWithExc  f  (arr: _[]) =

        let rec loop i =

            if i >= arr.Length then None else

            match f arr.[i] with

            |Some x  ->

               //printfn "Node OK: %A" arr.[i]

               Some x 

            |None  ->

               //printfn "Try open next node: %A" arr.[i]

               loop (i+1) 

            |Exc ex->

                    //printfn "Exception %A" ex.Message

                    Exc ex 

        loop 0

 

  //continuation with the task execution:

  //catch the results of a previous computation (nodeValue),

  //eventually extracts the service and executes the task 

 

  let continueWith channelTask= (fun nodeValue  ->  

               match nodeValue with

               |Some service  ->

                   let cResult=channelTask service

                   Some (cResult)

               |None ->

                   //printfn "None"

                   None

               |Exc ex ->

                   //printfn "Exc"

                   Exc ex   )

                 

  //Catch a value or exception and produce a choice                  

  let tryChoiceExc tres x =

    try

     Choice2_1 (tres x)

    with exn ->

     Choice2_2(exn)

Figure 3. Types and fundamental functions

           

We present a three choice generic type OpenedRepNodesWithExc<'T> (Figure 3.). We can look at it as a generalization of the Option type. Eventually it generates also an exception.

The function transformResult transform the three choices value to two choices.

The function tryFindInArrWithExc (Figure 3) takes an array of strings as a parameter and recursively generate a value of type OpenedRepNodesWithExc (three options).

The »continuation« function continueWith in Figure 3. takes two parameters:

 - a function ChannelTask ('a->'b)

 - a lambda function that transforms a choice of type OpenedRepNodesWithExc<'a> into OpenedRepNodesWithExc<'b>.

The function continueWith is a binding function of the continuation monad.

The function tryChoiceExc returns a two choice type: eventually a result or an exception.

           

Predicate, map and two accumulate functions

           

#light

open System

open System.ServiceModel

open System.ComponentModel

open WcfServiceLibraryVFP

module MapTaskOnArr=

 open WcfConfig.ConfigClient

 open TryOpenNode.OpenWithExc

           

 //define the channel (service) function for the client

 let cf= new ChannelFactory<IService1>(myNetBinding)

 let channel nodeUri=cf.CreateChannel(new EndpointAddress( nodeUri))

 //the filter function on a node duplication array:

 //eventually will give us the pair (IService1,nodeUri)

 let predicateWithExc (nodeUri:string) =

   try

     use ichannelCurr = (channel nodeUri) :?> IClientChannel

     ichannelCurr. Open()

     match ichannelCurr.State= CommunicationState.Opened with

        |true  ->

           Some (channel , nodeUri)  //return a tuple function*string

        |false ->

           //printfn "Closed: %A" (nodeUri )

           None  //comm or time out exception

   with

        | :? System.ServiceModel.CommunicationException ->

           //printfn "Error thrown by comm ex: %A" (nodeUri)

           None

        | :? System.TimeoutException ->

           //printfn "Error thrown by timeout ex: %A" (nodeUri)

           None

        |   ex ->

          //printfn "Error thrown by other ex: %A" (nodeUri)

          rethrow  ()

 //the MAP function on the node array of duplicated nodes arrays

 //array of arrays !!!

 //node address - string

 let allWcfArr

  channelTask:(string->IService1)*string->'a)

  arrOfnodeAddress =

   arrOfnodeAddress

   |> Array.map 

     (fun nodeDuplicationArr  ->

       //sequence of (async) results

       async { return

       tryChoiceExc    //give us a choice: result or exception

       transformResult (   //from three to two choices

         //sequentially find the first node in a nodeDuplicationArr

         (nodeDuplicationArr

         //filter the duplicated nodes

         |>tryFindInArrWithExc (fun nodeUri ->predicateWithExc nodeUri) 

         //and possiblly run the continuation task

         |> continueWith channelTask ))

             }

      )

 //Run in parallel a task on all WCF nodes (MAP in parallel)     

 let mapParallelAsync channelTask arrOfnodeAddress=

 Async.Run ( Async.Parallel ( allWcfArr channelTask arrOfnodeAddress))     

           

module ReduceAccTwoFun=

  

  //REDUCE

 //Accumulate the values and exceptions

  //Extract the first component of a choice

  let firstC  (eoa:Choice<'a,list<Exception>>)=

     match eoa with

       |Choice2_1 u  -> u

 //Extract the second component of a choice

  let secondC  (eoa:Choice<'a,list<Exception>>)=

     match eoa with

      |Choice2_2 v -> v

      |_ -> []

 //bind the choices with two accumulate functions:

 //1. firstAccFun and

 //2. ADD an exception element to the list

  let AccumulateChoices

    (firstAccFun:Choice<'a,list<Exception>>->'a->'a) 

    (xy:Choice<'a,Exception>)

    (acc:Choice<'a,list<Exception>>)=

       match  xy with

         |(Choice2_1 u) -> Choice2_1 ( firstAccFun acc u)

         |(Choice2_2 v) -> 

         // printfn " exception: %A" v

         // printfn "Accumulated exception: %A" (second acc)

         Choice2_2 (v::(secondC acc))

           

 //REDUCE (sequential)

 //accumulate the array of choices with two accumulate functions

  let resAccC  firstAccFun initVal result=

    result

    |> Array.fold_left

      (fun  (acc:Choice<'a,list<Exception>>) x  ->

      (AccumulateChoices firstAccFun) x acc)

      initVal

  let fromChoice (resAccC:Choice<'a,list<Exception>>)=

     match resAccC with

      | Choice2_1 u -> printfn "Result value: %A" u

      | Choice2_2 exn ->exn|>List.iter (fun ex->printfn "Error: %A" ex.Message) 

           

Figure 4. Predicate and accumulate functions

           

Using the ChanelFactory method in Figure 4. we define the channel function. It's parameter is a nodeAddress. Next, we define a predicateWithExc function with an input parameter nodeAddress and a return value of type three choices: OpenedRepNodesWithExc<string->IService*string>. The tuple string->IService*string is the argument of the function channelTask. It returns a value of type 'a. If the node is eventually opened, we run this task.

           

Map function

           

The function allWcfArr map the array of arrays of node addresses (arrOfnodeAddress). For each element (nodeDuplicationArr)of this array of duplicated nodes we try to open the first WCF host. In case of success, we continue (function continueWith) and run the channel task. Otherwise we try to open the second node in the array of duplication nodes etc.

We transform the result from a three choice type to two and we apply the function tryChoiceExc that generate a value or an exception. Using the builder async we build the asynchrounous computation. The function allWcfArr produce an array of asynchronous results (Each node's response is a choice: a value or an exception).

Using the type Async we run the computation in parallel (Async.Run and Async.Parallel) and we extract out the array of values from the array of Async responses (Async<Choice<'a,exc>>).

           

Accumulating choices

           

The function firstC and secondC extract respectively the first and second component from a Choice <a',list<Exception>>.

The Map function (Figure 4.) generate an array of choices (values or Exception). We accumulate the values using the accumulate function for values (in our case firstAccFun) and we accumulate the Exceptions putting them in a list. The composition function (builder) AccumulateChoices in Figure 4. accumulate the first choice component using the function firstAccFun and accumulate the second component (Exception) eventually adding the exceptions to a list (of Exceptions).

Note. We can »accumulate« or catch only the first exception. We have to change the composition function AccumulateChoices.

           

Reduce with two accumulate functions

           

The function resAccC has three parameters: the first accumulate function (firstAccFun), the initial (seed) value of type Choice<'a,Exception list> and the third parameter, the results from the previously used map function - an array of choices Choice<'a,Exception>[].

We accumulate the results (reduce) using the Array.fold_left and the bind function AccumulateChoices. Each node responses are appended to the seed value, resulting in a final choice (value or list of exceptions).

           

Extracting the results

           

The result of the reduce function resAccC is a type Choice<'a,Exception list>. We extract its components using the function fromChoice, that return a value or eventually the list of Exceptions. We print these results.

           

Map and reduce the task asynchronously

           

In the next figure we present an example of the client code that send the command to run the store procedure in the backend databases on all nodes. The WCF host method return a byte stream with an encoded DataTable. We deserialize this byte stream into a DataTable and sequentially (not in parallel) append all the received DataTables to the initial (seed) value.

           

#light

open System

open System.Threading

open System.Data

open System.ServiceModel

open System.ComponentModel

open WcfServiceLibraryVFP

open WcfConfig.ConfigClient

let StreamToDataTable (r:byte[]) =

   let stream = new System.IO.MemoryStream(r)

   let bf = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter()

   let (ds:DataSet)=( bf.Deserialize(stream)) :?> DataSet

   ds.Tables.[0]

//1. Task (channel task)

let channelTask  ((channel,nodeUri):(string->IService1)*string) =

    let res=(channel nodeUri). FromStoreProc(connStr,"ReturnResult","181000","181990")

    //printfn "Opened  and threadId: %A"  ( "  "+Thread.CurrentThread.ManagedThreadId.ToString())

    StreamToDataTable res  // res - response from a request to the server

//2. Run in parallel a task on all WCF nodes (MAP in parallel)

// printfn "Results array: %A" result        

//run the synchronous function channelTask on a new thread

let fn=new Func<(string->IService1)*string,DataTable>(channelTask)

let channelTaskAsync ((channel,nodeUri):(string->IService1)*string) =

     Async.Run (

        async {return!

        Async.BuildPrimitive((channel,nodeUri),fn.BeginInvoke,fn.EndInvoke) })

//3. accumulator seed value ("",[])

let (initVal:Choice<DataTable,list<Exception>>)=Choice<DataTable,list<Exception>>.Choice2_1(new DataTable() )

           

//4. REDUCE

// (the first) accumulate function

let mergeDataTable (acc:DataTable) u=

    acc.Merge(u)

    //printfn "Accumulate  and threadId: %A"  ( "  "+Thread.CurrentThread.ManagedThreadId.ToString())

    acc

//create a delegate

let fnDT=new Func<DataTable,DataTable,DataTable>(mergeDataTable)

//run the merge task on a new thread - create and run na async primitive

let mergeDataTableAsync (acc:DataTable) (u:DataTable)=

     Async.Run (

        async {return!

        Async.BuildPrimitive(acc,u,fnDT.BeginInvoke,fnDT.EndInvoke) }) 

//5. print the results

let fromChoiceTable (resAccC:Choice<DataTable,list<Exception>>)=

    match resAccC with

     | Choice2_1 acct -> printfn "Result value: %A ROWS" acct.Rows.Count

     | Choice2_2 exn ->exn|>List.iter (fun ex->printfn "Error: %A" ex.Message) 

//pipeline version

module MapReduceTableAndExcAsync=

 open ClientWithTwoAcc.ReduceAccTwoFun

 open ClientWithTwoAcc.MapTaskOnArr

 open TryOpenNode.OpenWithExc    

 //printfn "ThreadId main : %A" (Thread.CurrentThread.ManagedThreadId.ToString())

 //6. Map, reduce and show the results

 arrOfnodeAddress

   |>mapParallelAsync  (fun (channel, nodeUri)->

      channelTaskAsync (channel,nodeUri) )

   |>resAccC (fun (acc:Choice<DataTable,list<Exception>>) (u:DataTable)->

      //let adt=(firstC acc)

      mergeDataTableAsync (firstC acc) u )   //not in parallel !!!

      initVal

   |>fromChoiceTable  //finally show the results

           

//One parameter application

module MapReduceTableAndExcAsyncOne=

 open ClientWithTwoAcc.ReduceAccTwoFun

 open ClientWithTwoAcc.MapTaskOnArr

 open TryOpenNode.OpenWithExc   

 let mapReduceOneParam channelTask=

  arrOfnodeAddress

   |>mapParallelAsync  (fun (channel, nodeUri)->

     let fn=new Func<(string->IService1)*string,DataTable>(channelTask)

     Async.Run (

       async {return!

       Async.BuildPrimitive((channel,nodeUri),fn.BeginInvoke,fn.EndInvoke) })

    )

   |>resAccC (fun (acc:Choice<DataTable,list<Exception>>) (u:DataTable)->

      //let adt=(firstC acc)

      mergeDataTableAsync (firstC acc) u )   //not in parallel !!!

      initVal

   |>fromChoiceTable

 //6. Run map, reduce and show the results

 //A one parameter (byteRes) application. byteRes is a lambda function (string->byteArray).

 let MapReduceAcc byteRes =mapReduceOneParam (fun (channel,nodeUri) ->(StreamToDataTable (byteRes nodeUri)))

           

 let byteRes =(fun nodeUri -> (channel nodeUri).FromStoreProc(connStr,"ReturnResult","181000","181990"))

  

 MapReduceAcc byteRes

  

 Console.ReadKey(true)

  

           

Figure 5. The map and reduce function that asynchronously generate a value: DataTable or Exception[]

           

An example of the WCF service method is in Figure 5. A request to a single WCF host is: IService1. FromStoreProc (connStr, "ReturnResult",  "181000", "181990"). Using the lambda function (fun nodeUri -> (channel nodeUri) and the MapReduceAcc function we run the same request to multiple WCF services:

MapReduceAcc  ((fun nodeUri -> (channel nodeUri).FromStoreProc(connStr,"ReturnResult","181000","181990")))

  

The functions defined in Figures 2-5. us enables to present two different client applications:

-          a pipeline version of the function composition,

-          a one parameter version.

The function StreamToDataTable (Figure 5) extracts the DataTable from the encoded byte stream.

We construct the client code in six steps:

1) The channelTask function argument is a tuple (channel,nodeUri). It runs a WCF method FromStoreProc and returns a byte stream. From it we extract a DatTable.

2) We define an asynchronous version of the channelTask function (the function channelTaskAsync in Figure 5.). We create a delegate fn and using the Async.BuildPrimitive we specify an asynchronous computation with the methods BeginInvoke and EndInvoke.

3) We specify a seed value ((»«,[])).

4) The function mergeDataTable merge the received DataTables. We see also the asynchronous version of this function (mergeDataTableAsync) (Figure 5.). It merges the responses on a new thread.

5) The function fromChoiceTable extract the results from the generic Choice<DataTable,list<Exception>> and print the number of received rows or eventually the list of exceptions (Figure 5).

The module MapReduceTableAndExcAsync (Figure 5.) consist of a pipeline version of the application.

6) We compose the functions (transformations) that map in parallel the request to the nodes, accumulate (reduce) the responses and print the results.

In the next module (Figure 5) MapReduceTableAndExcAsyncOne we present a one parameter application:

MapReduceAcc byteRes.

The argument byteResult is a function in which the programer changes only the name of the WCF method (FromStoreProc) and its parameters, so she/he can run the request to the database nodes changing a single line of code in the lambda function byteRes. The code is similar to that for one WCF node. 

           

Conclusion

           

We can make a request in parallel to the WCF nodes (its backend database) with a single command without locking and particular attention to exception management. The code scale well; we add new nodes (databases and theirs duplications) by changing only the app.config file.

We access the duplication nodes sequentially (tryFindInArrWithExc), so the first node in the app.config has a priority. You can change the code and access the duplication nodes in parallel, without priority and run the continuation function on the first opened node.

The predicate predicateWitExc filters the replication nodes using the criteria »opened«, but we can change these criteria, filtering out, for example, the less busy node (we have to maintain a value that informs us about this node state).

           

Resource

  1. Chris Smith: Programming F#
  2. Don Syme, Adam Granicz, and Antonio Cisternino: Expert F#
  3. John Puopolo and Sandy Squires: F# Survival Guide 

           

Download. Console application.