Accumulating choices in F#, the case of a distributed database
Author: Josip
Zohil,
Josip.zohil1@guest.arnes.si
Resume
In distributed relational database systems
clients access the data in parallel from multiple nodes (parallel
execution servers) or theirs
duplication. A request to the node can have alternative responses: values, exception
(node down), and other exceptions. In case a node is down, the response is an exception;
we send a request to the next duplication node. The exceptions are the bases for
our decisions: we accept or reject them.
In this article we present the client code
in F# for parallel data retrieval from a distributed read-only relational database
system using a Windows Communication Foundation (WCF). We use a three choices F#
data type. The exceptions and other responses values are managed and/or accumulated
asynchronously, in parallel and without locks.
Problem
It is known that relational database systems
don't scale well using the classic client-server architecture. Some times it is
possible to spread the database across different but interconnected computer systems
(nodes). Each computer system (node) has a relational database manager that manages
the tables in its environment.
To ensure fault tolerance, in case a particular node breaks down, the data is made
redundant, so each database is stored in the system multiple times (duplication
nodes).
For example, annual data of last periods
are normally immutable (p.e. accounting annual data). We can aggregate this periodical
data (p.e. 10 annual data of 800 MB each) in one table (of 8 GB). The other possibility
is to retrieve the data from 10 tables in one database or from 10 databases on the
same server. An 8 GB table and its indexes are relatively large, and the time taken
by the system for data retrieve is high. Having 10 (smaller) databases on one computer
system also generate problems (disk access and missing cache hits).
Spreading the databases on different computers
(nodes) and additional multiple duplication nodes ease the load on the database.
Because of duplication, single site failure does not affect performance. We »accept«
the node failure (exception) and proceeded with the next duplicated node. Extra
work has to be done in managing the eventual failures (exceptions). The client task
is running in parallel on all the nodes (parallel requests). We have multiple nodes
and duplication nodes, so eventually we can have also multiple exceptions in one
request to the distributed databases: we have to accumulate the response values
and eventually the exceptions.
The goal is to make distributed data retrieval
on a large computer (node) cluster simple enough for ordinary programmers without
the use of manual threading, locks, and so on. The programmer intervention is minimal
and appears to the user as the databases were united.
Solution
Spreading the databases on multiple different
computers ease some of this problems. The nodes don't interact mutually, only the
client interacts with them by message passing. The client can access this nodes
data running the same request in parallel on all nodes. Particular nodes can be
down. To ensure fault tolerance, we duplicate each node data on multiple nodes (in
our example on one additional node).
1) Normally, the client sends a request
in parallel to the WCF hosts (nodes), the WCF dispatch the request to the database
system, catch the response from the database (backend database), and send the message
to the client, where the responses from multiple (parallel) nodes are accumulated.
2) In case a node is down (a communication
or timeout exception) we send the same request (message) to the next duplication
node. So the client code makes a decision based on the generated node responses
(values or exceptions).
3) In case all duplication nodes are down,
we inform the user about the problem.
4) In case there are exceptions, we accumulate
them and generate a message for the client (user).
In the distributed database system the
performance is mainly dictated by the quality of the disk and cache ratio and network
bandwidth.
F# console application
In this article we will study a case where
a database is spread on two nodes and two duplication nodes (the copies of the primary
nodes). The number of nodes is a parameter, so you can use the programs for the
arbitrary number of nodes. On every node we have the WCF host. We present the client
side code as a consol application in Visual Studio 2008. For pedagogical reason
we present, in the first part of the article, a F# code with the intermediate values
of the functions composition, and in the last part the compact F# code
(with the minimal number of intermediate values).
In the client application, named DistrTwoAccFun,
we add a reference to the WCF host dll (WcfServiceLibraryVFP.dll) (The project code
and the dll's are in the download of this article). The class IService1 of this
dll exposes also these WCF methods:
IService1.MyOperation1 (string) (for test),
with a return value of type string,
IService1.FromStoreProc(string, string,
string, string) (to access a store procedure in the database), with a returned value
of type DataSet.DataTable encoded as a byte stream.
Application configuration
<appSettings>
<add
key="numberOfNodes" value="2"/>
<add
key="node1address1"
value="net.tcp://192.168.123.165:8731/Design_Time_Addresses/WcfServiceLibraryVFP/Service1/" />
<add
key="node2ondaddress1"
value="net.tcp://192.168.123.110:8731/Design_Time_Addresses/WcfServiceLibraryVFP/Service1/" />
<add
key="node1address2"
value="net.tcp://192.168.123.166:8731/Design_Time_Addresses/WcfServiceLibraryVFP/Service1/" />
<add
key="node2address2"
value="net.tcp://192.168.123.111:8731/Design_Time_Addresses/WcfServiceLibraryVFP/Service1/" />
<add
key="connStr" value="Provider=VFPOLEDB.1;Data Source=D:\moj\att2006\data;Collating
Sequence=MACHINE"/>
<add
key="fileName" value="d:\moj\compose\mywcf.prg"/>
<add
key="fileSerPath" value="d:\moj\att2006\data\mywcf.prg"/>
<add
key="myNetBinding" value="NetTcpBinding_IService11"/>
</appSettings>
Figure 1. The
appSetings of the app.config file
We will study the distributed database
in a local area network using a WCF messaging system based on the net.tcp protocol.
The nodes are identified by theirs IP, port and WCF service addresses (p.e.
node1address1=
net.tcp://192.168.123.165:8731 /Design_Time_Addresses/ WcfServiceLibraryVFP/ Service1/). The name of the second node is
node1address2
and the name of its duplicated node is node2address2 (See, config.app in Figure
1).
The config.app (Figure 1) also specifies
the maximum number of connections (10), the bindings (netTcpBinding),
the number of nodes (2) and other key values.
#light
open System
open System.Configuration
module ConfigClient=
let aps=ConfigurationManager.AppSettings
let myNetBinding=aps.["myNetBinding"]
let fileSerPath = aps.["fileSerPath"]
let fileName =aps.["fileName"]
let connStr = aps.["connStr"]
let numberOfNodes = Convert.ToInt32(
aps.["numberOfNodes"])
let ge=aps.AllKeys
let predicate x (i:int)=x.ToString()
.StartsWith("node"+Convert.ToString(i))
let mam i=Array.FindAll(aps.AllKeys,
(fun x -> (predicate
x i) ))
let nArr (i:int)= Array.map
(fun (x:string) ->
aps.[x] ) ( mam
i)
let arrOfnodeAddress=[| for i in 1..numberOfNodes
do yield (nArr
i) |]
Figure 2. WCFConfig.fs file
In Figure 2 is the code snippet that reads
the configuration parameters from the config.app (Figure 1). Using the predicate
we filter the key values starting with »node« characters and we transform the data
to an array of arrays of node addresses. For example, the first element of the first
array is the address of the first database node, the second element is the address
of its first copy (duplication)...
Types and fundamental functions
#light
open System
module OpenWithExc=
//three options type: opened, not opened, Exception
type OpenedRepNodesWithExc<'T>
=
|Some
of 'T
|None
|Exc of Exception
//print the results and eventually the
duplication nodes array x
member this.print_value x=
match this with
| Some a
-> printfn "This node
(from the array) is opened: %A" a
| None
-> printfn "All nodes
are down: %A" x
| Exc ex
-> printfn "Exception-
no node opened: %A" (ex.Message)
//reduce the results from three options to
two
let transformResult x
=
match x
with
|Some x ->
x
|None ->
raise (System.Exception ("No node opened ")
)
|Exc ex -> raise
(System.Exception (ex.Message ))
//try find the first node which suits the function
(the predicate)
let tryFindInArrWithExc
f (arr: _[]) =
let
rec loop i =
if
i >= arr.Length then None
else
match
f arr.[i] with
|Some x
->
//printfn "Node OK: %A" arr.[i]
Some x
|None
->
//printfn "Try open next node: %A" arr.[i]
loop (i+1)
|Exc ex->
//printfn "Exception %A" ex.Message
Exc
ex
loop 0
//continuation with the task execution:
//catch the results of a previous computation
(nodeValue),
//eventually extracts the service and executes
the task
let continueWith channelTask= (fun nodeValue
->
match nodeValue with
|Some service
->
let cResult=channelTask service
Some
(cResult)
|None
->
//printfn "None"
None
|Exc ex
->
//printfn "Exc"
Exc
ex )
//Catch a value or exception and produce a
choice
let tryChoiceExc tres x =
try
Choice2_1 (tres x)
with exn
->
Choice2_2(exn)
Figure 3. Types and fundamental functions
We present a three choice generic type
OpenedRepNodesWithExc<'T> (Figure
3.). We can look at it as a generalization of the Option
type. Eventually it generates also an exception.
The function transformResult transform the three choices value to
two choices.
The function tryFindInArrWithExc (Figure
3) takes an array of strings as a parameter and recursively generate a value of
type OpenedRepNodesWithExc (three options).
The »continuation« function
continueWith in Figure 3. takes two parameters:
- a function ChannelTask ('a->'b)
- a lambda function that transforms
a choice of type OpenedRepNodesWithExc<'a> into OpenedRepNodesWithExc<'b>.
The function continueWith is a binding function of the continuation monad.
The function tryChoiceExc returns a two
choice type: eventually a result or an exception.
Predicate, map and two accumulate functions
#light
open System
open System.ServiceModel
open System.ComponentModel
open WcfServiceLibraryVFP
module
MapTaskOnArr=
open WcfConfig.ConfigClient
open TryOpenNode.OpenWithExc
//define the channel (service) function for
the client
let cf= new
ChannelFactory<IService1>(myNetBinding)
let channel nodeUri=cf.CreateChannel(new EndpointAddress( nodeUri))
//the filter function on a node duplication
array:
//eventually will give us the pair (IService1,nodeUri)
let predicateWithExc (nodeUri:string)
=
try
use ichannelCurr = (channel
nodeUri) :?> IClientChannel
ichannelCurr. Open()
match ichannelCurr.State=
CommunicationState.Opened with
|true
->
Some (channel , nodeUri)
//return a tuple function*string
|false ->
//printfn
"Closed: %A" (nodeUri )
None
//comm or time out exception
with
| :? System.ServiceModel.CommunicationException
->
//printfn
"Error thrown by comm ex: %A" (nodeUri)
None
| :? System.TimeoutException
->
//printfn
"Error thrown by timeout ex: %A" (nodeUri)
None
|
ex ->
//printfn "Error
thrown by other ex: %A" (nodeUri)
rethrow
()
//the MAP function on the node array of duplicated
nodes arrays
//array of arrays !!!
//node address - string
let allWcfArr
channelTask:(string->IService1)*string->'a)
arrOfnodeAddress =
arrOfnodeAddress
|> Array.map
(fun nodeDuplicationArr ->
//sequence of (async) results
async { return
tryChoiceExc
//give us a choice: result or exception
transformResult (
//from three to two choices
//sequentially find the first node in a nodeDuplicationArr
(nodeDuplicationArr
//filter the duplicated
nodes
|>tryFindInArrWithExc (fun
nodeUri ->predicateWithExc nodeUri)
//and possiblly
run the continuation task
|> continueWith channelTask ))
}
)
//Run in parallel a task on all WCF nodes (MAP
in parallel)
let mapParallelAsync channelTask arrOfnodeAddress=
Async.Run ( Async.Parallel ( allWcfArr channelTask arrOfnodeAddress))
module
ReduceAccTwoFun=
//REDUCE
//Accumulate the values and exceptions
//Extract the first component of a choice
let firstC
(eoa:Choice<'a,list<Exception>>)=
match eoa
with
|Choice2_1 u
-> u
//Extract the second component of a choice
let secondC
(eoa:Choice<'a,list<Exception>>)=
match eoa
with
|Choice2_2 v ->
v
|_ -> []
//bind the choices with two accumulate functions:
//1. firstAccFun and
//2. ADD an exception element to the list
let AccumulateChoices
(firstAccFun:Choice<'a,list<Exception>>->'a->'a)
(xy:Choice<'a,Exception>)
(acc:Choice<'a,list<Exception>>)=
match
xy with
|(Choice2_1 u) ->
Choice2_1 ( firstAccFun acc u)
|(Choice2_2 v) ->
// printfn " exception:
%A" v
// printfn "Accumulated
exception: %A" (second acc)
Choice2_2 (v::(secondC acc))
//REDUCE (sequential)
//accumulate the array of choices with two accumulate
functions
let resAccC
firstAccFun initVal result=
result
|> Array.fold_left
(fun
(acc:Choice<'a,list<Exception>>) x
->
(AccumulateChoices firstAccFun) x acc)
initVal
let fromChoice (resAccC:Choice<'a,list<Exception>>)=
match resAccC
with
| Choice2_1 u ->
printfn "Result value: %A" u
| Choice2_2 exn ->exn|>List.iter
(fun ex->printfn
"Error: %A" ex.Message)
Figure 4. Predicate
and accumulate functions
Using the ChanelFactory method in Figure
4. we define the channel function. It's parameter is a nodeAddress. Next, we define
a predicateWithExc function with an input parameter nodeAddress and a return value
of type three choices: OpenedRepNodesWithExc<string->IService*string>.
The tuple string->IService*string is the argument of the function channelTask.
It returns a value of type 'a. If the node is eventually opened, we run this task.
Map function
The function
allWcfArr map the array of arrays of node addresses (arrOfnodeAddress). For each
element (nodeDuplicationArr)of this array of duplicated nodes we try to open the
first WCF host. In case of success, we continue (function continueWith) and run
the channel task. Otherwise we try to open the second node in the array of duplication
nodes etc.
We transform
the result from a three choice type to two and we apply the function tryChoiceExc
that generate a value or an exception. Using the builder async we build the asynchrounous
computation. The function allWcfArr produce an array of asynchronous results (Each
node's response is a choice: a value or an exception).
Using the type
Async we run the computation in parallel (Async.Run and Async.Parallel) and we extract
out the array of values from the array of Async responses (Async<Choice<'a,exc>>).
Accumulating choices
The function
firstC and secondC extract respectively the first and second component from a
Choice <a',list<Exception>>.
The Map function
(Figure 4.) generate an array of choices (values or Exception). We accumulate the
values using the accumulate function for values (in our case firstAccFun) and we
accumulate the Exceptions putting them in a list. The composition function (builder)
AccumulateChoices in Figure 4. accumulate the first choice component using the function
firstAccFun and accumulate the second component (Exception) eventually adding the
exceptions to a list (of Exceptions).
Note. We can
»accumulate« or catch only the first exception. We have to change the composition
function AccumulateChoices.
Reduce with two accumulate functions
The function
resAccC has three parameters: the first accumulate function (firstAccFun), the initial
(seed) value of type Choice<'a,Exception list> and the third parameter, the
results from the previously used map function - an array of choices Choice<'a,Exception>[].
We accumulate
the results (reduce) using the Array.fold_left and the bind function AccumulateChoices.
Each node responses are appended to the seed value, resulting in a final choice
(value or list of exceptions).
Extracting the results
The result of
the reduce function resAccC is a type Choice<'a,Exception list>. We extract
its components using the function fromChoice, that return a value or eventually
the list of Exceptions. We print these results.
Map and reduce the task asynchronously
In the next figure
we present an example of the client code that send the command to run the store
procedure in the backend databases on all nodes. The WCF host method return a byte
stream with an encoded DataTable. We deserialize this byte stream into a DataTable
and sequentially (not in parallel) append all the received DataTables to the initial
(seed) value.
#light
open System
open System.Threading
open System.Data
open System.ServiceModel
open System.ComponentModel
open WcfServiceLibraryVFP
open WcfConfig.ConfigClient
let StreamToDataTable
(r:byte[]) =
let stream =
new System.IO.MemoryStream(r)
let bf =
new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter()
let (ds:DataSet)=( bf.Deserialize(stream))
:?> DataSet
ds.Tables.[0]
//1. Task (channel task)
let channelTask ((channel,nodeUri):(string->IService1)*string)
=
let res=(channel nodeUri).
FromStoreProc(connStr,"ReturnResult","181000","181990")
//printfn "Opened
and threadId: %A" ( " "+Thread.CurrentThread.ManagedThreadId.ToString())
StreamToDataTable res
// res - response from a request to the server
//2. Run in parallel a task on all WCF nodes (MAP in parallel)
// printfn "Results array: %A" result
//run the synchronous function channelTask on a new thread
let fn=new Func<(string->IService1)*string,DataTable>(channelTask)
let channelTaskAsync
((channel,nodeUri):(string->IService1)*string)
=
Async.Run (
async {return!
Async.BuildPrimitive((channel,nodeUri),fn.BeginInvoke,fn.EndInvoke)
})
//3. accumulator seed value ("",[])
let (initVal:Choice<DataTable,list<Exception>>)=Choice<DataTable,list<Exception>>.Choice2_1(new DataTable() )
//4. REDUCE
// (the first) accumulate function
let mergeDataTable
(acc:DataTable) u=
acc.Merge(u)
//printfn "Accumulate
and threadId: %A" ( " "+Thread.CurrentThread.ManagedThreadId.ToString())
acc
//create a delegate
let fnDT=new Func<DataTable,DataTable,DataTable>(mergeDataTable)
//run the merge task on a new thread - create and run na async primitive
let mergeDataTableAsync
(acc:DataTable) (u:DataTable)=
Async.Run (
async {return!
Async.BuildPrimitive(acc,u,fnDT.BeginInvoke,fnDT.EndInvoke)
})
//5. print the results
let fromChoiceTable
(resAccC:Choice<DataTable,list<Exception>>)=
match resAccC
with
| Choice2_1 acct ->
printfn "Result value: %A ROWS" acct.Rows.Count
| Choice2_2 exn ->exn|>List.iter
(fun ex->printfn
"Error: %A" ex.Message)
//pipeline version
module
MapReduceTableAndExcAsync=
open ClientWithTwoAcc.ReduceAccTwoFun
open ClientWithTwoAcc.MapTaskOnArr
open TryOpenNode.OpenWithExc
//printfn "ThreadId main : %A" (Thread.CurrentThread.ManagedThreadId.ToString())
//6. Map, reduce and show the results
arrOfnodeAddress
|>mapParallelAsync
(fun (channel, nodeUri)->
channelTaskAsync (channel,nodeUri) )
|>resAccC (fun (acc:Choice<DataTable,list<Exception>>)
(u:DataTable)->
//let adt=(firstC acc)
mergeDataTableAsync (firstC acc) u )
//not in parallel !!!
initVal
|>fromChoiceTable
//finally show the results
//One parameter application
module
MapReduceTableAndExcAsyncOne=
open ClientWithTwoAcc.ReduceAccTwoFun
open ClientWithTwoAcc.MapTaskOnArr
open TryOpenNode.OpenWithExc
let mapReduceOneParam channelTask=
arrOfnodeAddress
|>mapParallelAsync
(fun (channel, nodeUri)->
let fn=new
Func<(string->IService1)*string,DataTable>(channelTask)
Async.Run (
async {return!
Async.BuildPrimitive((channel,nodeUri),fn.BeginInvoke,fn.EndInvoke)
})
)
|>resAccC (fun (acc:Choice<DataTable,list<Exception>>)
(u:DataTable)->
//let adt=(firstC acc)
mergeDataTableAsync (firstC acc) u )
//not in parallel !!!
initVal
|>fromChoiceTable
//6. Run map, reduce and show the results
//A one parameter (byteRes) application.
byteRes is a lambda function (string->byteArray).
let MapReduceAcc byteRes =mapReduceOneParam
(fun (channel,nodeUri) ->(StreamToDataTable
(byteRes nodeUri)))
let byteRes =(fun
nodeUri -> (channel nodeUri).FromStoreProc(connStr,"ReturnResult","181000","181990"))
MapReduceAcc byteRes
Console.ReadKey(true)
Figure 5. The
map and reduce function that asynchronously generate a value: DataTable or Exception[]
An example of
the WCF service method is in Figure 5. A request to a single WCF host is: IService1.
FromStoreProc (connStr, "ReturnResult",
"181000", "181990").
Using the lambda function (fun nodeUri
-> (channel nodeUri) and the MapReduceAcc function we run the same
request to multiple WCF services:
MapReduceAcc ((fun nodeUri
-> (channel nodeUri).FromStoreProc(connStr,"ReturnResult","181000","181990")))
The functions
defined in Figures 2-5. us enables to present two different client applications:
-
a pipeline
version of the function composition,
-
a one
parameter version.
The function
StreamToDataTable (Figure 5) extracts the DataTable from the encoded byte stream.
We construct
the client code in six steps:
1) The channelTask
function argument is a tuple (channel,nodeUri). It runs a WCF method FromStoreProc
and returns a byte stream. From it we extract a DatTable.
2) We define
an asynchronous version of the channelTask function (the function channelTaskAsync
in Figure 5.).
We create a delegate fn and using the Async.BuildPrimitive we specify an asynchronous
computation with the methods BeginInvoke and EndInvoke.
3) We specify
a seed value ((»«,[])).
4) The function
mergeDataTable merge the received DataTables. We see also the asynchronous version
of this function (mergeDataTableAsync) (Figure 5.). It merges the responses on a
new thread.
5) The function
fromChoiceTable extract the results from the generic Choice<DataTable,list<Exception>>
and print the number of received rows or eventually the list of exceptions (Figure
5).
The module MapReduceTableAndExcAsync
(Figure 5.)
consist of a pipeline version of the application.
6) We compose
the functions (transformations) that map in parallel the request to the nodes, accumulate
(reduce) the responses and print the results.
In the next module
(Figure 5) MapReduceTableAndExcAsyncOne we present a one parameter application:
MapReduceAcc
byteRes.
The argument
byteResult is a function in which the programer changes only the name of the WCF
method (FromStoreProc) and its parameters, so she/he can run the request to the
database nodes changing a single line of code in the lambda function byteRes. The
code is similar to that for one WCF node.
Conclusion
We can make a
request in parallel to the WCF nodes (its backend database) with a single command
without locking and particular attention to exception management. The code scale
well; we add new nodes (databases and theirs duplications) by changing only the
app.config file.
We access the
duplication nodes sequentially (tryFindInArrWithExc), so the first node in the app.config
has a priority. You can change the code and access the duplication nodes in parallel,
without priority and run the continuation function on the first opened node.
The predicate
predicateWitExc filters the replication nodes using the criteria »opened«, but we
can change these criteria, filtering out, for example, the less busy node (we
have to maintain a value that informs us about this node state).
Resource
-
Chris Smith:
Programming F#
-
Don Syme, Adam Granicz, and Antonio Cisternino: Expert
F#
- John Puopolo and Sandy Squires: F# Survival Guide
Download. Console application.