-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathsession.go
358 lines (323 loc) · 10.1 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
// Copyright 2020 Zhizhesihai (Beijing) Technology Limited.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package zetta
import (
"container/list"
"fmt"
"log"
"strings"
"sync"
"time"
"github.com/zhihu/zetta-client-go/utils/retry"
tspb "github.com/zhihu/zetta-proto/pkg/tablestore"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
)
// sessionHandle is an interface for transactions to access Cloud Spanner sessions safely. It is generated by sessionPool.take().
type sessionHandle struct {
// mu guarantees that inner session object is returned / destroyed only once.
mu sync.Mutex
// session is a pointer to a session object. Transactions never need to access it directly.
session *session
}
// recycle gives the inner session object back to its home session pool. It is safe to call recycle multiple times but only the first one would take effect.
func (sh *sessionHandle) recycle() {
sh.mu.Lock()
defer sh.mu.Unlock()
if sh.session == nil {
// sessionHandle has already been recycled.
return
}
sh.session.recycle()
sh.session = nil
}
// getID gets the Cloud Spanner session ID from the internal session object. getID returns empty string if the sessionHandle is nil or the inner session
// object has been released by recycle / destroy.
func (sh *sessionHandle) getID() string {
sh.mu.Lock()
defer sh.mu.Unlock()
if sh.session == nil {
// sessionHandle has already been recycled/destroyed.
return ""
}
return sh.session.getID()
}
// getClient gets the Cloud Spanner RPC client associated with the session ID in sessionHandle.
func (sh *sessionHandle) getClient() tspb.TablestoreClient {
sh.mu.Lock()
defer sh.mu.Unlock()
if sh.session == nil {
return nil
}
return sh.session.client
}
// getTransactionID returns the transaction id in the session if available.
func (sh *sessionHandle) getTransactionID() transactionID {
sh.mu.Lock()
defer sh.mu.Unlock()
if sh.session == nil {
return nil
}
return sh.session.tx
}
// destroy destroys the inner session object. It is safe to call destroy multiple times and only the first call would attempt to
// destroy the inner session object.
func (sh *sessionHandle) destroy() {
sh.mu.Lock()
s := sh.session
sh.session = nil
sh.mu.Unlock()
if s == nil {
// sessionHandle has already been destroyed.
return
}
s.destroy(false)
}
//
// session wraps a Zetta Server session ID through which transactions are created and executed.
//
type session struct {
// one session lead to one database
db string
// pb client is the RPC channel to Server
client tspb.TablestoreClient
// id is the unique id of the session in Server, allocate in creation
id string
// createTime is the timestamp of the session's creation
createTime time.Time
mu sync.Mutex
valid bool
// tx contains the transaction id if the session has been prepared for write
tx transactionID
// pool fileds
// pool is the session's home session pool where it was created. It is set only once during session's creation.
pool *sessionPool
// hcIndex is the index of the session inside the global healthcheck queue. If hcIndex < 0, session has been unregistered from the queue.
hcIndex int
// idleList is the linkedlist node which links the session to its home session pool's idle list. If idleList == nil, the
// session is not in idle list.
idleList *list.Element
// nextCheck is the timestamp of next scheduled healthcheck of the session. It is maintained by the global health checker.
nextCheck time.Time
// checkingHelath is true if currently this session is being processed by health checker. Must be modified under health checker lock.
checkingHealth bool
}
func (s *session) isValid() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.valid
}
// isWritePrepared returns true if the session is prepared for write.
func (s *session) isWritePrepared() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.tx != nil
}
func (s *session) String() string {
s.mu.Lock()
defer s.mu.Unlock()
return fmt.Sprintf("[session]: id=%v, valid=%v, create=%v", s.id, s.valid, s.createTime)
}
// ping verifies if the session is still alive in Server
func (s *session) ping() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
return retry.Invoke(ctx, func(ctx context.Context, settings retry.CallSettings) error {
_, err := s.client.GetSession(ctx, &tspb.GetSessionRequest{Name: s.getID()})
return err
})
}
// invalidate marks a session as invalid and returns the old validity.
func (s *session) invalidate() bool {
s.mu.Lock()
defer s.mu.Unlock()
ov := s.valid
s.valid = false
return ov
}
// setTransactionID sets the transaction id in the session
func (s *session) setTransactionID(tx transactionID) {
s.mu.Lock()
defer s.mu.Unlock()
s.tx = tx
}
// getID returns the session ID which uniquely identifies the session in Server
func (s *session) getID() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.id
}
func (s *session) getClient() tspb.TablestoreClient {
s.mu.Lock()
defer s.mu.Unlock()
return s.client
}
func (s *session) getTransactionID() transactionID {
s.mu.Lock()
defer s.mu.Unlock()
return s.tx
}
// destroy removes the session from Server
func (s *session) destroy(isExpire bool) bool {
// Remove s from session pool.
if !s.pool.remove(s, isExpire) {
return false
}
// Unregister s from healthcheck queue.
s.pool.hc.unregister(s)
// Remove s from Cloud Spanner service.
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
s.delete(ctx)
return true
}
func (s *session) delete(ctx context.Context) {
// Ignore the error because even if we fail to explicitly destroy the
// session, it will be eventually garbage collected by Cloud Spanner.
if err := retry.Invoke(ctx, func(ctx context.Context, settings retry.CallSettings) error {
_, e := s.client.DeleteSession(ctx, &tspb.DeleteSessionRequest{Name: s.getID()})
return e
}); err != nil {
if err != nil {
log.Printf("Failed to delete session %v. Error: %v", s.getID(), err)
}
}
}
// prepareForWrite prepares the session for write if it is not already in that state.
func (s *session) prepareForWrite(ctx context.Context) error {
if s.isWritePrepared() {
return nil
}
tx, err := beginTransaction(ctx, s.getID(), s.client)
if err != nil {
return err
}
s.setTransactionID(tx)
return nil
}
func (s *session) recycle() {
s.setTransactionID(nil)
if !s.pool.recycle(s) {
// s is rejected by its home session pool because it expired and the
// session pool currently has enough open sessions.
s.destroy(false)
}
}
//
// functions for session pool
//
// refreshIdle refreshes the session's session ID if it is in its home session pool's idle list
// and returns true if successful.
func (s *session) refreshIdle() bool {
s.mu.Lock()
validAndIdle := s.valid && s.idleList != nil
s.mu.Unlock()
if !validAndIdle {
// Optimization: return early if s is not valid or if s is not in idle list.
return false
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var sid string
err := retry.Invoke(ctx, func(ctx context.Context, settings retry.CallSettings) error {
session, e := s.client.CreateSession(ctx, &tspb.CreateSessionRequest{Database: s.pool.db})
if e != nil {
return e
}
sid = session.Name
return nil
})
if err != nil {
return false
}
s.pool.mu.Lock()
s.mu.Lock()
var recycle bool
if s.valid && s.idleList != nil {
// session is in idle list, refresh its session id.
sid, s.id = s.id, sid
if s.tx != nil {
s.tx = nil
s.pool.idleWriteList.Remove(s.idleList)
// We need to put this session back into the pool.
recycle = true
}
}
s.mu.Unlock()
s.pool.mu.Unlock()
if recycle {
s.pool.recycle(s)
}
// If we fail to explicitly destroy the session, it will be eventually garbage collected by
// Cloud Spanner.
if err = retry.Invoke(ctx, func(ctx context.Context, settings retry.CallSettings) error {
_, e := s.client.DeleteSession(ctx, &tspb.DeleteSessionRequest{Name: sid})
return e
}); err != nil {
return false
}
return true
}
// setHcIndex atomically sets the session's index in the healthcheck queue and returns the old index.
func (s *session) setHcIndex(i int) int {
s.mu.Lock()
defer s.mu.Unlock()
oi := s.hcIndex
s.hcIndex = i
return oi
}
// setIdleList atomically sets the session's idle list link and returns the old link.
func (s *session) setIdleList(le *list.Element) *list.Element {
s.mu.Lock()
defer s.mu.Unlock()
old := s.idleList
s.idleList = le
return old
}
// setNextCheck sets the timestamp for next healthcheck on the session.
func (s *session) setNextCheck(t time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
s.nextCheck = t
}
// getHcIndex returns the session's index into the global healthcheck priority queue.
func (s *session) getHcIndex() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.hcIndex
}
// getIdleList returns the session's link in its home session pool's idle list.
func (s *session) getIdleList() *list.Element {
s.mu.Lock()
defer s.mu.Unlock()
return s.idleList
}
// getNextCheck returns the timestamp for next healthcheck on the session.
func (s *session) getNextCheck() time.Time {
s.mu.Lock()
defer s.mu.Unlock()
return s.nextCheck
}
// shouldDropSession returns true if a particular error leads to the removal of a session
func shouldDropSession(err error) bool {
if err == nil {
return false
}
// If a Cloud Spanner can no longer locate the session (for example, if session is garbage collected), then caller
// should not try to return the session back into the session pool.
// TODO: once gRPC can return auxilary error information, stop parsing the error message.
if ErrCode(err) == codes.NotFound && strings.Contains(ErrDesc(err), "Session not found:") {
return true
}
return false
}