1 : #include "shared.hh"
2 : #include "local-store.hh"
3 : #include "util.hh"
4 : #include "serialise.hh"
5 : #include "worker-protocol.hh"
6 : #include "archive.hh"
7 : #include "globals.hh"
8 :
9 : #include <iostream>
10 : #include <unistd.h>
11 : #include <signal.h>
12 : #include <sys/types.h>
13 : #include <sys/wait.h>
14 : #include <sys/stat.h>
15 : #include <sys/socket.h>
16 : #include <sys/un.h>
17 : #include <fcntl.h>
18 : #include <errno.h>
19 :
20 : using namespace nix;
21 :
22 :
23 : #ifndef SIGPOLL
24 : #define SIGPOLL SIGIO
25 : #endif
26 :
27 :
28 139 : static FdSource from(STDIN_FILENO);
29 139 : static FdSink to(STDOUT_FILENO);
30 :
31 : bool canSendStderr;
32 : pid_t myPid;
33 :
34 :
35 :
36 : /* This function is called anytime we want to write something to
37 : stderr. If we're in a state where the protocol allows it (i.e.,
38 : when canSendStderr), send the message to the client over the
39 : socket. */
40 346 : static void tunnelStderr(const unsigned char * buf, size_t count)
41 : {
42 : /* Don't send the message to the client if we're a child of the
43 : process handling the connection. Otherwise we could screw up
44 : the protocol. It's up to the parent to redirect stderr and
45 : send it to the client somehow (e.g., as in build.cc). */
46 346 : if (canSendStderr && myPid == getpid()) {
47 : try {
48 254 : writeInt(STDERR_NEXT, to);
49 254 : writeString(string((char *) buf, count), to);
50 0 : } catch (...) {
51 : /* Write failed; that means that the other side is
52 : gone. */
53 0 : canSendStderr = false;
54 0 : throw;
55 : }
56 : } else
57 92 : writeFull(STDERR_FILENO, buf, count);
58 346 : }
59 :
60 :
61 : /* Return true if the remote side has closed its end of the
62 : connection, false otherwise. Should not be called on any socket on
63 : which we expect input! */
64 550 : static bool isFarSideClosed(int socket)
65 : {
66 : struct timeval timeout;
67 550 : timeout.tv_sec = timeout.tv_usec = 0;
68 :
69 : fd_set fds;
70 550 : FD_ZERO(&fds);
71 550 : FD_SET(socket, &fds);
72 :
73 550 : while (select(socket + 1, &fds, 0, 0, &timeout) == -1)
74 0 : if (errno != EINTR) throw SysError("select()");
75 :
76 550 : if (!FD_ISSET(socket, &fds)) return false;
77 :
78 : /* Destructive read to determine whether the select() marked the
79 : socket as readable because there is actual input or because
80 : we've reached EOF (i.e., a read of size 0 is available). */
81 : char c;
82 : int rd;
83 0 : if ((rd = read(socket, &c, 1)) > 0)
84 0 : throw Error("EOF expected (protocol error?)");
85 0 : else if (rd == -1 && errno != ECONNRESET)
86 0 : throw SysError("expected connection reset or EOF");
87 :
88 0 : return true;
89 : }
90 :
91 :
92 : /* A SIGPOLL signal is received when data is available on the client
93 : communication scoket, or when the client has closed its side of the
94 : socket. This handler is enabled at precisely those moments in the
95 : protocol when we're doing work and the client is supposed to be
96 : quiet. Thus, if we get a SIGPOLL signal, it means that the client
97 : has quit. So we should quit as well.
98 :
99 : Too bad most operating systems don't support the POLL_HUP value for
100 : si_code in siginfo_t. That would make most of the SIGPOLL
101 : complexity unnecessary, i.e., we could just enable SIGPOLL all the
102 : time and wouldn't have to worry about races. */
103 0 : static void sigPollHandler(int sigNo)
104 : {
105 : try {
106 : /* Check that the far side actually closed. We're still
107 : getting spurious signals every once in a while. I.e.,
108 : there is no input available, but we get a signal with
109 : POLL_IN set. Maybe it's delayed or something. */
110 0 : if (isFarSideClosed(from.fd)) {
111 0 : if (!blockInt) {
112 0 : _isInterrupted = 1;
113 0 : blockInt = 1;
114 0 : canSendStderr = false;
115 0 : string s = "SIGPOLL\n";
116 0 : write(STDERR_FILENO, s.c_str(), s.size());
117 : }
118 : } else {
119 0 : string s = "spurious SIGPOLL\n";
120 0 : write(STDERR_FILENO, s.c_str(), s.size());
121 : }
122 : }
123 0 : catch (Error & e) {
124 : /* Shouldn't happen. */
125 0 : string s = "impossible: " + e.msg() + '\n';
126 0 : write(STDERR_FILENO, s.c_str(), s.size());
127 0 : throw;
128 : }
129 0 : }
130 :
131 :
132 1192 : static void setSigPollAction(bool enable)
133 : {
134 : struct sigaction act, oact;
135 1192 : act.sa_handler = enable ? sigPollHandler : SIG_IGN;
136 1192 : sigfillset(&act.sa_mask);
137 1192 : act.sa_flags = 0;
138 1192 : if (sigaction(SIGPOLL, &act, &oact))
139 0 : throw SysError("setting handler for SIGPOLL");
140 1192 : }
141 :
142 :
143 : /* startWork() means that we're starting an operation for which we
144 : want to send out stderr to the client. */
145 550 : static void startWork()
146 : {
147 550 : canSendStderr = true;
148 :
149 : /* Handle client death asynchronously. */
150 550 : setSigPollAction(true);
151 :
152 : /* Of course, there is a race condition here: the socket could
153 : have closed between when we last read from / wrote to it, and
154 : between the time we set the handler for SIGPOLL. In that case
155 : we won't get the signal. So do a non-blocking select() to find
156 : out if any input is available on the socket. If there is, it
157 : has to be the 0-byte read that indicates that the socket has
158 : closed. */
159 550 : if (isFarSideClosed(from.fd)) {
160 0 : _isInterrupted = 1;
161 0 : checkInterrupt();
162 : }
163 550 : }
164 :
165 :
166 : /* stopWork() means that we're done; stop sending stderr to the
167 : client. */
168 550 : static void stopWork(bool success = true, const string & msg = "")
169 : {
170 : /* Stop handling async client death; we're going to a state where
171 : we're either sending or receiving from the client, so we'll be
172 : notified of client death anyway. */
173 550 : setSigPollAction(false);
174 :
175 550 : canSendStderr = false;
176 :
177 550 : if (success)
178 548 : writeInt(STDERR_LAST, to);
179 : else {
180 2 : writeInt(STDERR_ERROR, to);
181 2 : writeString(msg, to);
182 : }
183 550 : }
184 :
185 :
186 : struct TunnelSink : Sink
187 0 : {
188 : Sink & to;
189 0 : TunnelSink(Sink & to) : to(to)
190 : {
191 0 : }
192 : virtual void operator ()
193 0 : (const unsigned char * data, unsigned int len)
194 : {
195 0 : writeInt(STDERR_WRITE, to);
196 0 : writeString(string((const char *) data, len), to);
197 0 : }
198 : };
199 :
200 :
201 : struct TunnelSource : Source
202 0 : {
203 : Source & from;
204 0 : TunnelSource(Source & from) : from(from)
205 : {
206 0 : }
207 : virtual void operator ()
208 0 : (unsigned char * data, unsigned int len)
209 : {
210 : /* Careful: we're going to receive data from the client now,
211 : so we have to disable the SIGPOLL handler. */
212 0 : setSigPollAction(false);
213 0 : canSendStderr = false;
214 :
215 0 : writeInt(STDERR_READ, to);
216 0 : writeInt(len, to);
217 0 : string s = readString(from);
218 0 : if (s.size() != len) throw Error("not enough data");
219 0 : memcpy(data, (const unsigned char *) s.c_str(), len);
220 :
221 0 : startWork();
222 0 : }
223 : };
224 :
225 :
226 : static void performOp(unsigned int clientVersion,
227 458 : Source & from, Sink & to, unsigned int op)
228 : {
229 458 : switch (op) {
230 :
231 : #if 0
232 : case wopQuit: {
233 : /* Close the database. */
234 : store.reset((StoreAPI *) 0);
235 : writeInt(1, to);
236 : break;
237 : }
238 : #endif
239 :
240 : case wopIsValidPath: {
241 58 : Path path = readStorePath(from);
242 58 : startWork();
243 58 : bool result = store->isValidPath(path);
244 58 : stopWork();
245 58 : writeInt(result, to);
246 58 : break;
247 : }
248 :
249 : case wopHasSubstitutes: {
250 12 : Path path = readStorePath(from);
251 12 : startWork();
252 12 : bool result = store->hasSubstitutes(path);
253 12 : stopWork();
254 12 : writeInt(result, to);
255 12 : break;
256 : }
257 :
258 : case wopQueryPathHash: {
259 0 : Path path = readStorePath(from);
260 0 : startWork();
261 0 : Hash hash = store->queryPathHash(path);
262 0 : stopWork();
263 0 : writeString(printHash(hash), to);
264 0 : break;
265 : }
266 :
267 : case wopQueryReferences:
268 : case wopQueryReferrers: {
269 0 : Path path = readStorePath(from);
270 0 : startWork();
271 0 : PathSet paths;
272 0 : if (op == wopQueryReferences)
273 0 : store->queryReferences(path, paths);
274 : else
275 0 : store->queryReferrers(path, paths);
276 0 : stopWork();
277 0 : writeStringSet(paths, to);
278 0 : break;
279 : }
280 :
281 : case wopQueryDeriver: {
282 0 : Path path = readStorePath(from);
283 0 : startWork();
284 0 : Path deriver = store->queryDeriver(path);
285 0 : stopWork();
286 0 : writeString(deriver, to);
287 0 : break;
288 : }
289 :
290 : case wopAddToStore: {
291 : /* !!! uberquick hack */
292 42 : string baseName = readString(from);
293 42 : bool fixed = readInt(from) == 1;
294 42 : bool recursive = readInt(from) == 1;
295 42 : string hashAlgo = readString(from);
296 :
297 42 : Path tmp = createTempDir();
298 42 : AutoDelete delTmp(tmp);
299 42 : Path tmp2 = tmp + "/" + baseName;
300 42 : restorePath(tmp2, from);
301 :
302 42 : startWork();
303 42 : Path path = store->addToStore(tmp2, fixed, recursive, hashAlgo);
304 42 : stopWork();
305 :
306 42 : writeString(path, to);
307 42 : break;
308 : }
309 :
310 : case wopAddTextToStore: {
311 74 : string suffix = readString(from);
312 74 : string s = readString(from);
313 74 : PathSet refs = readStorePaths(from);
314 74 : startWork();
315 74 : Path path = store->addTextToStore(suffix, s, refs);
316 74 : stopWork();
317 74 : writeString(path, to);
318 74 : break;
319 : }
320 :
321 : case wopExportPath: {
322 0 : Path path = readStorePath(from);
323 0 : bool sign = readInt(from) == 1;
324 0 : startWork();
325 0 : TunnelSink sink(to);
326 0 : store->exportPath(path, sign, sink);
327 0 : stopWork();
328 0 : writeInt(1, to);
329 0 : break;
330 : }
331 :
332 : case wopImportPath: {
333 0 : startWork();
334 0 : TunnelSource source(from);
335 0 : Path path = store->importPath(true, source);
336 0 : stopWork();
337 0 : writeString(path, to);
338 0 : break;
339 : }
340 :
341 : case wopBuildDerivations: {
342 56 : PathSet drvs = readStorePaths(from);
343 56 : startWork();
344 56 : store->buildDerivations(drvs);
345 54 : stopWork();
346 54 : writeInt(1, to);
347 54 : break;
348 : }
349 :
350 : case wopEnsurePath: {
351 44 : Path path = readStorePath(from);
352 44 : startWork();
353 44 : store->ensurePath(path);
354 44 : stopWork();
355 44 : writeInt(1, to);
356 44 : break;
357 : }
358 :
359 : case wopAddTempRoot: {
360 26 : Path path = readStorePath(from);
361 26 : startWork();
362 26 : store->addTempRoot(path);
363 26 : stopWork();
364 26 : writeInt(1, to);
365 26 : break;
366 : }
367 :
368 : case wopAddIndirectRoot: {
369 0 : Path path = absPath(readString(from));
370 0 : startWork();
371 0 : store->addIndirectRoot(path);
372 0 : stopWork();
373 0 : writeInt(1, to);
374 0 : break;
375 : }
376 :
377 : case wopSyncWithGC: {
378 26 : startWork();
379 26 : store->syncWithGC();
380 26 : stopWork();
381 26 : writeInt(1, to);
382 26 : break;
383 : }
384 :
385 : case wopFindRoots: {
386 26 : startWork();
387 26 : Roots roots = store->findRoots();
388 26 : stopWork();
389 52 : writeInt(roots.size(), to);
390 138 : for (Roots::iterator i = roots.begin(); i != roots.end(); ++i) {
391 112 : writeString(i->first, to);
392 112 : writeString(i->second, to);
393 : }
394 26 : break;
395 : }
396 :
397 : case wopCollectGarbage: {
398 2 : GCOptions options;
399 2 : options.action = (GCOptions::GCAction) readInt(from);
400 2 : options.pathsToDelete = readStorePaths(from);
401 2 : options.ignoreLiveness = readInt(from);
402 2 : options.maxFreed = readLongLong(from);
403 2 : options.maxLinks = readInt(from);
404 :
405 2 : GCResults results;
406 :
407 2 : startWork();
408 2 : if (options.ignoreLiveness)
409 0 : throw Error("you are not allowed to ignore liveness");
410 2 : store->collectGarbage(options, results);
411 2 : stopWork();
412 :
413 2 : writeStringSet(results.paths, to);
414 2 : writeLongLong(results.bytesFreed, to);
415 2 : writeLongLong(results.blocksFreed, to);
416 :
417 2 : break;
418 : }
419 :
420 : case wopSetOptions: {
421 92 : keepFailed = readInt(from) != 0;
422 92 : keepGoing = readInt(from) != 0;
423 92 : tryFallback = readInt(from) != 0;
424 92 : verbosity = (Verbosity) readInt(from);
425 92 : maxBuildJobs = readInt(from);
426 92 : maxSilentTime = readInt(from);
427 92 : if (GET_PROTOCOL_MINOR(clientVersion) >= 2)
428 92 : useBuildHook = readInt(from) != 0;
429 92 : if (GET_PROTOCOL_MINOR(clientVersion) >= 4) {
430 92 : buildVerbosity = (Verbosity) readInt(from);
431 92 : logType = (LogType) readInt(from);
432 92 : printBuildTrace = readInt(from) != 0;
433 : }
434 92 : startWork();
435 92 : stopWork();
436 92 : break;
437 : }
438 :
439 : case wopQuerySubstitutablePathInfo: {
440 0 : Path path = absPath(readString(from));
441 0 : startWork();
442 0 : SubstitutablePathInfo info;
443 0 : bool res = store->querySubstitutablePathInfo(path, info);
444 0 : stopWork();
445 0 : writeInt(res ? 1 : 0, to);
446 0 : if (res) {
447 0 : writeString(info.deriver, to);
448 0 : writeStringSet(info.references, to);
449 0 : writeLongLong(info.downloadSize, to);
450 : }
451 0 : break;
452 : }
453 :
454 : default:
455 0 : throw Error(format("invalid operation %1%") % op);
456 : }
457 456 : }
458 :
459 :
460 92 : static void processConnection()
461 : {
462 : RemoveTempRoots removeTempRoots __attribute__((unused));
463 :
464 92 : canSendStderr = false;
465 92 : myPid = getpid();
466 92 : writeToStderr = tunnelStderr;
467 :
468 : /* Allow us to receive SIGPOLL for events on the client socket. */
469 92 : setSigPollAction(false);
470 92 : if (fcntl(from.fd, F_SETOWN, getpid()) == -1)
471 0 : throw SysError("F_SETOWN");
472 92 : if (fcntl(from.fd, F_SETFL, fcntl(from.fd, F_GETFL, 0) | FASYNC) == -1)
473 0 : throw SysError("F_SETFL");
474 :
475 : /* Exchange the greeting. */
476 92 : unsigned int magic = readInt(from);
477 92 : if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
478 92 : writeInt(WORKER_MAGIC_2, to);
479 :
480 92 : writeInt(PROTOCOL_VERSION, to);
481 92 : unsigned int clientVersion = readInt(from);
482 :
483 : /* Send startup error messages to the client. */
484 92 : startWork();
485 :
486 : try {
487 :
488 : /* If we can't accept clientVersion, then throw an error
489 : *here* (not above). */
490 :
491 : #if 0
492 : /* Prevent users from doing something very dangerous. */
493 : if (geteuid() == 0 &&
494 : querySetting("build-users-group", "") == "")
495 : throw Error("if you run `nix-worker' as root, then you MUST set `build-users-group'!");
496 : #endif
497 :
498 : /* Open the store. */
499 92 : store = boost::shared_ptr<StoreAPI>(new LocalStore());
500 :
501 92 : stopWork();
502 :
503 0 : } catch (Error & e) {
504 0 : stopWork(false, e.msg());
505 0 : return;
506 : }
507 :
508 : /* Process client requests. */
509 92 : unsigned int opCount = 0;
510 :
511 458 : while (true) {
512 : WorkerOp op;
513 : try {
514 550 : op = (WorkerOp) readInt(from);
515 92 : } catch (EndOfFile & e) {
516 : break;
517 : }
518 :
519 458 : opCount++;
520 :
521 : try {
522 458 : performOp(clientVersion, from, to, op);
523 4 : } catch (Error & e) {
524 2 : stopWork(false, e.msg());
525 : }
526 :
527 458 : assert(!canSendStderr);
528 : };
529 :
530 184 : printMsg(lvlError, format("%1% worker operations") % opCount);
531 : }
532 :
533 :
534 64 : static void sigChldHandler(int sigNo)
535 : {
536 : /* Reap all dead children. */
537 64 : while (waitpid(-1, 0, WNOHANG) > 0) ;
538 32 : }
539 :
540 :
541 47 : static void setSigChldAction(bool autoReap)
542 : {
543 : struct sigaction act, oact;
544 47 : act.sa_handler = autoReap ? sigChldHandler : SIG_DFL;
545 47 : sigfillset(&act.sa_mask);
546 47 : act.sa_flags = 0;
547 47 : if (sigaction(SIGCHLD, &act, &oact))
548 0 : throw SysError("setting SIGCHLD handler");
549 47 : }
550 :
551 :
552 1 : static void daemonLoop()
553 : {
554 : /* Get rid of children automatically; don't let them become
555 : zombies. */
556 1 : setSigChldAction(true);
557 :
558 : /* Create and bind to a Unix domain socket. */
559 1 : AutoCloseFD fdSocket = socket(PF_UNIX, SOCK_STREAM, 0);
560 1 : if (fdSocket == -1)
561 0 : throw SysError("cannot create Unix domain socket");
562 :
563 1 : string socketPath = nixStateDir + DEFAULT_SOCKET_PATH;
564 :
565 1 : createDirs(dirOf(socketPath));
566 :
567 : /* Urgh, sockaddr_un allows path names of only 108 characters. So
568 : chdir to the socket directory so that we can pass a relative
569 : path name. */
570 1 : chdir(dirOf(socketPath).c_str());
571 1 : Path socketPathRel = "./" + baseNameOf(socketPath);
572 :
573 : struct sockaddr_un addr;
574 1 : addr.sun_family = AF_UNIX;
575 1 : if (socketPathRel.size() >= sizeof(addr.sun_path))
576 0 : throw Error(format("socket path `%1%' is too long") % socketPathRel);
577 1 : strcpy(addr.sun_path, socketPathRel.c_str());
578 :
579 1 : unlink(socketPath.c_str());
580 :
581 : /* Make sure that the socket is created with 0666 permission
582 : (everybody can connect --- provided they have access to the
583 : directory containing the socket). */
584 1 : mode_t oldMode = umask(0111);
585 1 : int res = bind(fdSocket, (struct sockaddr *) &addr, sizeof(addr));
586 1 : umask(oldMode);
587 1 : if (res == -1)
588 0 : throw SysError(format("cannot bind to socket `%1%'") % socketPath);
589 :
590 1 : chdir("/"); /* back to the root */
591 :
592 1 : if (listen(fdSocket, 5) == -1)
593 0 : throw SysError(format("cannot listen on socket `%1%'") % socketPath);
594 :
595 : /* Loop accepting connections. */
596 71 : while (1) {
597 :
598 : try {
599 : /* Important: the server process *cannot* open the
600 : Berkeley DB environment, because it doesn't like forks
601 : very much. */
602 72 : assert(!store);
603 :
604 : /* Accept a connection. */
605 : struct sockaddr_un remoteAddr;
606 72 : socklen_t remoteAddrLen = sizeof(remoteAddr);
607 :
608 : AutoCloseFD remote = accept(fdSocket,
609 72 : (struct sockaddr *) &remoteAddr, &remoteAddrLen);
610 72 : checkInterrupt();
611 72 : if (remote == -1)
612 26 : if (errno == EINTR)
613 26 : continue;
614 : else
615 0 : throw SysError("accepting connection");
616 :
617 46 : printMsg(lvlInfo, format("accepted connection %1%") % remote);
618 :
619 : /* Fork a child to handle the connection. */
620 : pid_t child;
621 46 : child = fork();
622 :
623 91 : switch (child) {
624 :
625 : case -1:
626 0 : throw SysError("unable to fork");
627 :
628 : case 0:
629 : try { /* child */
630 :
631 : /* Background the worker. */
632 46 : if (setsid() == -1)
633 0 : throw SysError(format("creating a new session"));
634 :
635 : /* Restore normal handling of SIGCHLD. */
636 46 : setSigChldAction(false);
637 :
638 : /* Since the daemon can be long-running, the
639 : settings may have changed. So force a reload. */
640 46 : reloadSettings();
641 :
642 : /* Handle the connection. */
643 46 : from.fd = remote;
644 46 : to.fd = remote;
645 46 : processConnection();
646 :
647 0 : } catch (std::exception & e) {
648 0 : std::cerr << format("child error: %1%\n") % e.what();
649 : }
650 46 : exit(0);
651 0 : }
652 :
653 0 : } catch (Interrupted & e) {
654 0 : throw;
655 0 : } catch (Error & e) {
656 0 : printMsg(lvlError, format("error processing connection: %1%") % e.msg());
657 : }
658 0 : }
659 : }
660 :
661 :
662 47 : void run(Strings args)
663 : {
664 47 : bool slave = false;
665 47 : bool daemon = false;
666 :
667 94 : for (Strings::iterator i = args.begin(); i != args.end(); ) {
668 47 : string arg = *i++;
669 47 : if (arg == "--slave") slave = true;
670 47 : if (arg == "--daemon") daemon = true;
671 : }
672 :
673 47 : if (slave) {
674 : /* This prevents us from receiving signals from the terminal
675 : when we're running in setuid mode. */
676 46 : if (setsid() == -1)
677 0 : throw SysError(format("creating a new session"));
678 :
679 46 : processConnection();
680 : }
681 :
682 1 : else if (daemon) {
683 1 : if (setuidMode)
684 0 : throw Error("daemon cannot be started in setuid mode");
685 1 : chdir("/");
686 1 : daemonLoop();
687 : }
688 :
689 : else
690 0 : throw Error("must be run in either --slave or --daemon mode");
691 46 : }
692 :
693 :
694 : #include "help.txt.hh"
695 :
696 0 : void printHelp()
697 : {
698 0 : std::cout << string((char *) helpText, sizeof helpText);
699 0 : }
700 :
701 0 :
702 186 : string programId = "nix-worker";
|