-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathserver.go
More file actions
276 lines (227 loc) · 10.3 KB
/
server.go
File metadata and controls
276 lines (227 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
package modbus
import (
"fmt"
)
/*
Atomic allows locked access to the server's internal cache of coil, discrete, input, holding, and file values.
implementation in serverCache.go An Atomic instance is created by calling the StartAtomic() function on the Server
Do not Complete an atomic unless you started it. It's normal to `defer a.Complete()` immediately after starting it
atomic := server.StartAtomic()
defer atomic.Complete()
// do stuff using the atomic...
*/
type Atomic interface {
// Complete indicates that all operations in the atomic set are queued. It returns when all operations have completed.
Complete()
execute(func())
}
// UpdateCoils is a function called when coils are expected to be written by request from a remote client
// Do not Complete the atomic
type UpdateCoils func(server Server, atomic Atomic, address int, values []bool, current []bool) ([]bool, error)
// UpdateHoldings is a function called when holding registers are expected to be written by request from a remote client
// Do not Complete the atomic
type UpdateHoldings func(server Server, atomic Atomic, address int, values []int, current []int) ([]int, error)
// UpdateFile is a function called when files are expected to be written by request from a remote client
// Do not Complete the atomic
type UpdateFile func(server Server, atomic Atomic, file int, address int, values []int, current []int) ([]int, error)
// Server represents a system that can handle an incoming request from a remote client
type Server interface {
// Diagnostics returns the current diagnostic counts of the server instance
Diagnostics() ServerDiagnostics
// Busy will return true if a command is actively being handled
Busy() bool
// StartAtomic requests that access to the internal memory model/cache (coils, registers, discretes, inputs and files)
// of the Server is granted. Only 1 transaction is active at a time, and is active until it is Completed.
StartAtomic() Atomic
// RegisterDiscretes indicates how many discretes to make available in the server memory model/cache
RegisterDiscretes(count int)
// ReadDiscretes performs a discrete read operation as part of an existing atomic operation from the memory model/cache
ReadDiscretes(atomic Atomic, address int, count int) ([]bool, error)
// ReadDiscretesAtomic performs an atomic ReadDiscretes
ReadDiscretesAtomic(address int, count int) ([]bool, error)
// WriteDiscretes performs a discrete write operation as part of an existing atomic operation to the memory model/cache
WriteDiscretes(atomic Atomic, address int, values []bool) error
// WriteDiscretesAtomic performs an atomic WriteDiscretes
WriteDiscretesAtomic(address int, values []bool) error
// RegisterCoils indicates how many coils to make available in the server memory model/cache, and which function to call
// when a remote client attempts to update the coil settings
RegisterCoils(count int, handler UpdateCoils)
// ReadCoils performs a coil read operation as part of an existing atomic operation from the memory model/cache
ReadCoils(atomic Atomic, address int, count int) ([]bool, error)
// ReadCoilsAtomic performs an atomic ReadCoils
ReadCoilsAtomic(address int, count int) ([]bool, error)
// WriteCoils performs a coil write operation as part of an existing atomic operation to the memory model/cache
WriteCoils(atomic Atomic, address int, values []bool) error
// WriteCoilsAtomic performs an atomic WriteCoils
WriteCoilsAtomic(address int, values []bool) error
// RegisterInputs indicates how many inputs to make available in the server memory model/cache
RegisterInputs(count int)
// ReadInputs performs ain input read operation as part of an existing atomic operation from the memory model/cache
ReadInputs(atomic Atomic, address int, count int) ([]int, error)
// ReadInputsAtomic performs an atomic ReadInputs
ReadInputsAtomic(address int, count int) ([]int, error)
// WriteInputs performs an input write operation as part of an existing atomic operation to the memory model/cache
WriteInputs(atomic Atomic, address int, values []int) error
// WriteInputsAtomic performs an atomic WriteInputs
WriteInputsAtomic(address int, values []int) error
// RegisterHoldings indicates how many coils to make available in the server memory model/cache, and which function to call
// when a remote client attempts to update the holding register values
RegisterHoldings(count int, handler UpdateHoldings)
// ReadHoldings performs a holding register read operation as part of an existing atomic operation from the memory model/cache
ReadHoldings(atomic Atomic, address int, count int) ([]int, error)
// ReadHoldingsAtomic performs an atomic ReadHoldings
ReadHoldingsAtomic(address int, count int) ([]int, error)
// WriteHoldings performs a holding register write operation as part of an existing atomic operation to the memory model/cache
WriteHoldings(atomic Atomic, address int, values []int) error
// WriteHoldingsAtomic performs an atomic WriteHoldings
WriteHoldingsAtomic(address int, values []int) error
// RegisterFiles indicates how many files to make available in the server memory model/cache, and which function to call
// when a remote client attempts to update the file records
RegisterFiles(count int, handler UpdateFile)
// ReadFileRecords performs a file records read operation as part of an existing atomic operation from the memory model/cache
ReadFileRecords(atomic Atomic, address int, offset int, count int) ([]int, error)
// ReadFileRecordsAtomic performs an atomic ReadFileRecords
ReadFileRecordsAtomic(address int, offset int, count int) ([]int, error)
// WriteFileRecords performs a file records write operation as part of an existing atomic operation to the memory model/cache
WriteFileRecords(atomic Atomic, address int, offset int, values []int) error
// WriteFileRecordsAtomic performs an atomic WriteFileRecords
WriteFileRecordsAtomic(address int, offset int, values []int) error
// request is called from the modbus layer and instructs the server to handle a request.
request(bus Modbus, unit byte, function byte, data []byte) ([]byte, error)
}
type requestHandler func(Modbus, *dataReader, *dataBuilder) error
type checkHandler func() error
type requestHandlerMeta struct {
function byte
minSize int
handler requestHandler
event bool
}
func (rhm requestHandlerMeta) notEvent() {
rhm.event = false
}
type server struct {
id []byte
deviceInfo []string
rhandlers map[byte]requestHandlerMeta
discretes []bool
coils []bool
inputs []int
holdings []int
files [][]int
atomics chan Atomic
diag *serverDiagnosticManager
updateCoils UpdateCoils
updateHoldings UpdateHoldings
updateFiles UpdateFile
}
// NewServer creates a Server instance that can be bound to a Modbus instance using modbus.SetServer(...).
func NewServer(id []byte, deviceInfo []string) (Server, error) {
if len(deviceInfo) < 3 {
return nil, fmt.Errorf("DeviceInfo is required to have at least 3 members, not %v", deviceInfo)
}
s := &server{}
s.id = make([]byte, len(id))
copy(s.id, id)
s.deviceInfo = make([]string, len(deviceInfo))
copy(s.deviceInfo, deviceInfo)
s.rhandlers = make(map[byte]requestHandlerMeta)
s.diag = newServerDiagnosticManager()
s.atomics = make(chan Atomic, 0)
// Set up the discrete handlers
s.addRequestHandler(0x02, 4, s.x02ReadDiscretes)
// Set up the coil handlers
s.addRequestHandler(0x01, 4, s.x01ReadCoils)
s.addRequestHandler(0x05, 4, s.x05WriteSingleCoil)
s.addRequestHandler(0x0f, 4, s.x0fWriteCoils)
// Set up the input handlers
s.addRequestHandler(0x04, 4, s.x04ReadInputRegisters)
// Set up the holding register handlers
s.addRequestHandler(0x03, 4, s.x03ReadHoldingRegisters)
s.addRequestHandler(0x06, 4, s.x06WriteSingleHoldingRegister)
s.addRequestHandler(0x10, 4, s.x10WriteHoldingRegisters)
s.addRequestHandler(0x16, 6, s.x16MaskWriteHoldingRegister)
s.addRequestHandler(0x17, 9, s.x17WriteReadHoldingRegisters)
s.addRequestHandler(0x18, 2, s.x18ReadFIFO)
// Set up the diagnostic handlers
s.addRequestHandler(0x07, 0, s.x07ReadExceptionStatus).notEvent()
s.addRequestHandler(0x2b, 1, s.x2bDeviceIdentification).notEvent()
s.addRequestHandler(0x11, 0, s.x11ReportServerID).notEvent()
s.addRequestHandler(0x08, 2, s.x08Diagnostic).notEvent()
s.addRequestHandler(0x0b, 0, s.x0bCommEventCounter).notEvent()
s.addRequestHandler(0x0c, 0, s.x0cCommEventLog).notEvent()
// Set up the File handlers
s.addRequestHandler(0x14, 1, s.x14ReadFileRecord).notEvent()
s.addRequestHandler(0x15, 8, s.x15WriteFileRecord).notEvent()
go s.manageCache()
return s, nil
}
func (s *server) addRequestHandler(function byte, minsize int, handler requestHandler) requestHandlerMeta {
ret := requestHandlerMeta{function, minsize, handler, true}
s.rhandlers[function] = ret
return ret
}
func (s *server) Diagnostics() ServerDiagnostics {
return s.diag.getDiagnostics()
}
func (s *server) Busy() bool {
return s.diag.busy()
}
func (s *server) RegisterDiscretes(count int) {
atomic := s.StartAtomic()
defer atomic.Complete()
s.ensureDiscretes(atomic, count)
}
func (s *server) RegisterCoils(count int, handler UpdateCoils) {
atomic := s.StartAtomic()
defer atomic.Complete()
s.ensureCoils(atomic, count)
s.updateCoils = handler
}
func (s *server) RegisterInputs(count int) {
atomic := s.StartAtomic()
defer atomic.Complete()
s.ensureInputs(atomic, count)
}
func (s *server) RegisterHoldings(count int, handler UpdateHoldings) {
atomic := s.StartAtomic()
defer atomic.Complete()
s.ensureHoldings(atomic, count)
s.updateHoldings = handler
}
func (s *server) RegisterFiles(count int, handler UpdateFile) {
atomic := s.StartAtomic()
defer atomic.Complete()
s.ensureFiles(atomic, count)
s.updateFiles = handler
}
func (s *server) request(mb Modbus, unit byte, function byte, request []byte) ([]byte, error) {
h, ok := s.rhandlers[function]
if !ok {
return nil, fmt.Errorf("Function code 0x%02x not implemented", function)
}
s.diag.message()
if h.event {
s.diag.eventQueued()
defer s.diag.eventComplete()
}
req := getReader(request)
res := dataBuilder{}
err := req.canRead(h.minSize)
if err != nil {
return nil, err
}
err = h.handler(mb, &req, &res)
if err != nil {
return nil, err
}
err = req.remaining()
if err != nil {
return nil, err
}
if h.event {
// a successful recorded event increments the successful event counter
s.diag.eventCounter()
}
return res.payload(), nil
}