XRootD
Loading...
Searching...
No Matches
XrdCmsState.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d C m s S t a t e . c c */
4/* */
5/* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <fcntl.h>
32#include <limits.h>
33#include <unistd.h>
34#include <netinet/in.h>
35#include <sys/types.h>
36#include <sys/stat.h>
37
39
40#include "Xrd/XrdLink.hh"
41
44#include "XrdCms/XrdCmsState.hh"
45#include "XrdCms/XrdCmsTrace.hh"
46
47#include "XrdSys/XrdSysError.hh"
48
49using namespace XrdCms;
50
51/******************************************************************************/
52/* G l o b a l s */
53/******************************************************************************/
54
56
57/******************************************************************************/
58/* C o n s t r u c t o r */
59/******************************************************************************/
60
61XrdCmsState::XrdCmsState() : mySemaphore(0)
62{
63 minNodeCnt = 1;
64 numActive = 0;
65 numStaging = 0;
66 currState = All_NoStage | All_Suspend;
67 prevState = 0;
70 feOK = 0;
71 noSpace = 0;
72 adminNoStage = 0;
73 adminSuspend = 0;
74 NoStageFile = "";
75 SuspendFile = "";
76 isMan = 0;
77 dataPort = 0;
78 Enabled = 0;
79}
80
81/******************************************************************************/
82/* Punlic E n a b l e */
83/******************************************************************************/
84
86{
87 struct stat buff;
88
89// Set correct admin staging state
90//
91 Update(Stage, stat(NoStageFile, &buff));
92
93// Set correct admin suspend state
94//
95 Update(Active, stat(SuspendFile, &buff));
96
97// We will force the information to be sent to interested parties by making
98// the previous state different from the current state and enabling ourselves.
99//
100 myMutex.Lock();
101 Enabled = 1;
102 prevState = ~currState;
103 mySemaphore.Post();
104 myMutex.UnLock();
105}
106
107/******************************************************************************/
108/* Public M o n i t o r */
109/******************************************************************************/
110
112{
113 CmsStatusRequest myStatus = {{0, kYR_status, 0, 0}};
114 int RTsend, theState, Changes, myPort;
115
116// Do this forever (we are only posted when finally enabled)
117//
118 do {mySemaphore.Wait();
119 myMutex.Lock();
120 Changes = currState ^ prevState;
121 theState = currState;
122 prevState = currState;
123 myPort = dataPort;
124 myMutex.UnLock();
125
126 if (Changes && (myStatus.Hdr.modifier = Status(Changes, theState)))
128 {myStatus.Hdr.streamid = htonl(myPort); RTsend = 1;}
129 else {myStatus.Hdr.streamid = 0;
130 RTsend = (isMan > 0 ? (theState & SRV_Suspend) : 0);
131 }
132 if (isMan && RTsend)
133 RTable.Send("status", (char *)&myStatus, sizeof(myStatus));
134 XrdCmsManager::Inform(myStatus.Hdr);
135 }
136 } while(1);
137
138// All done
139//
140 return (void *)0;
141}
142
143/******************************************************************************/
144/* Public P o r t */
145/******************************************************************************/
146
148{
149 int xPort;
150
151 myMutex.Lock();
152 xPort = dataPort;
153 myMutex.UnLock();
154 return xPort;
155}
156
157/******************************************************************************/
158/* Public s e n d S t a t e */
159/******************************************************************************/
160
162{
163 CmsStatusRequest myStatus = {{0, kYR_status, 0, 0}};
164
165 myMutex.Lock();
166 myStatus.Hdr.modifier = Suspended
169
170 myStatus.Hdr.modifier |= NoStaging
173
174 lp->Send((char *)&myStatus.Hdr, sizeof(myStatus.Hdr));
175 myMutex.UnLock();
176}
177
178/******************************************************************************/
179/* Public S e t */
180/******************************************************************************/
181
182void XrdCmsState::Set(int ncount)
183{
184
185// Set the node count (this requires a lock)
186//
187 myMutex.Lock();
188 minNodeCnt = ncount;
189 myMutex.UnLock();
190}
191
192/******************************************************************************/
193
194void XrdCmsState::Set(int ncount, int isman, const char *AdminPath)
195{
196 char fnbuff[1048];
197 int i;
198
199// This is a configuration call no locks are required.
200//
201 minNodeCnt = ncount;
202 isMan = isman;
203 i = strlen(AdminPath);
204 strcpy(fnbuff, AdminPath);
205 if (AdminPath[i-1] != '/') fnbuff[i++] = '/';
206 strcpy(fnbuff+i, "NOSTAGE");
207 NoStageFile = strdup(fnbuff);
208 strcpy(fnbuff+i, "SUSPEND");
209 SuspendFile = strdup(fnbuff);
210}
211
212/******************************************************************************/
213/* Private S t a t u s */
214/******************************************************************************/
215
216unsigned char XrdCmsState::Status(int Changes, int theState)
217{
218 const char *SRstate = 0, *SNstate = 0;
219 unsigned char rrModifier;
220
221// Check for suspend changes
222//
223 if (Changes & All_Suspend)
224 if (theState & All_Suspend)
225 {rrModifier = CmsStatusRequest::kYR_Suspend;
226 SRstate = "suspended";
227 } else {
228 rrModifier = CmsStatusRequest::kYR_Resume;
229 SRstate = "active";
230 }
231 else rrModifier = 0;
232
233// Check for staging changes
234//
235 if (Changes & All_NoStage)
236 {if (theState & All_NoStage)
237 {rrModifier |= CmsStatusRequest::kYR_noStage;
238 SNstate = "+ nostaging";
239 } else {
240 rrModifier |= CmsStatusRequest::kYR_Stage;
241 SNstate = "+ staging";
242 }
243 }
244
245// Report and return status
246//
247 if (rrModifier)
248 {if (!SRstate && SNstate) SNstate += 2;
249 Say.Emsg("State", "Status changed to", SRstate, SNstate);
250 }
251 return rrModifier;
252}
253
254/******************************************************************************/
255/* Public U p d a t e */
256/******************************************************************************/
257
258void XrdCmsState::Update(StateType StateT, int ActivCnt, int StageCnt)
259{
260 EPNAME("Update");
261 const char *What;
262 char newVal;
263
264// Create new state
265//
266 myMutex.Lock();
267 switch(StateT)
268 {case Active: if ((newVal = ActivCnt ? 0 : 1) != adminSuspend)
269 { if ( newVal && !StageCnt) unlink(SuspendFile);
270 else if (!newVal || !StageCnt) unlink(SuspendFile);
271 else close(open(SuspendFile, O_WRONLY|O_CREAT,
272 S_IRUSR|S_IWUSR));
273 adminSuspend = newVal;
274 }
275 What = "Active";
276 break;
277 case Counts: numStaging += StageCnt;
278 numActive += ActivCnt;
279 What = "Counts";
280 break;
281 case FrontEnd: if ((feOK = (ActivCnt ? 1 : 0)) && StageCnt >= 0)
282 dataPort = StageCnt;
283 What = "FrontEnd";
284 break;
285 case Space: noSpace = (ActivCnt ? 0 : 1);
286 What = "Space";
287 break;
288 case Stage: if ((newVal = ActivCnt ? 0 : 1) != adminNoStage)
289 {if (newVal) unlink(NoStageFile);
290 else close(open(NoStageFile, O_WRONLY|O_CREAT,
291 S_IRUSR|S_IWUSR));
292 adminNoStage = newVal;
293 }
294 What = "Stage";
295 break;
296 default: Say.Emsg("State", "Invalid state update");
297 What = "Unknown";
298 break;
299 }
300
301 DEBUG(What <<" Parm1=" <<ActivCnt <<" Parm2=" <<StageCnt);
302 currState=(numActive < minNodeCnt || adminSuspend ? SRV_Suspend:0)
303 |(numStaging < 1 || noSpace || adminNoStage ? All_NoStage:0)
304 | ( !feOK ? FES_Suspend:0);
305
306 Suspended = currState & All_Suspend;
307 NoStaging = currState & All_NoStage;
308
309// If any changes are noted then we must send out notifications
310//
311 if (currState != prevState && Enabled) mySemaphore.Post();
312
313// All done
314//
315 myMutex.UnLock();
316}
#define DEBUG(x)
#define EPNAME(x)
#define close(a)
Definition XrdPosix.hh:43
#define open
Definition XrdPosix.hh:71
#define unlink(a)
Definition XrdPosix.hh:108
#define stat(a, b)
Definition XrdPosix.hh:96
static void Inform(const char *What, const char *Data, int Dlen)
static const char FES_Suspend
static const char All_NoStage
void * Monitor()
void Update(StateType StateT, int ActivVal, int StageVal=0)
static const char All_Suspend
static const char SRV_Suspend
void Set(int ncount)
void Enable()
void sendState(XrdLink *Link)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
kXR_char modifier
Definition YProtocol.hh:85
XrdSysError Say
XrdCmsRTable RTable
XrdCmsState CmsState
kXR_unt32 streamid
Definition YProtocol.hh:83
@ kYR_status
Definition YProtocol.hh:112