pipe.c 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. /*
  2. * File : pipe.c
  3. * This file is part of RT-Thread RTOS
  4. * COPYRIGHT (C) 2012-2017, RT-Thread Development Team
  5. *
  6. * This program is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation; either version 2 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License along
  17. * with this program; if not, write to the Free Software Foundation, Inc.,
  18. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  19. *
  20. * Change Logs:
  21. * Date Author Notes
  22. * 2012-09-30 Bernard first version.
  23. */
  24. #include <rthw.h>
  25. #include <rtdevice.h>
  26. #if defined(RT_USING_POSIX)
  27. #include <dfs_file.h>
  28. #include <dfs_posix.h>
  29. #include <dfs_poll.h>
  30. static int pipe_fops_open(struct dfs_fd *fd)
  31. {
  32. rt_device_t device;
  33. rt_pipe_t *pipe;
  34. pipe = (rt_pipe_t *)fd->data;
  35. if (!pipe) return -1;
  36. device = &(pipe->parent);
  37. rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
  38. if (device->ref_count == 0)
  39. {
  40. pipe->fifo = rt_ringbuffer_create(PIPE_BUFSZ);
  41. }
  42. switch (fd->flags & O_ACCMODE)
  43. {
  44. case O_RDONLY:
  45. pipe->readers ++;
  46. break;
  47. case O_WRONLY:
  48. pipe->writers ++;
  49. break;
  50. case O_RDWR:
  51. pipe->readers ++;
  52. pipe->writers ++;
  53. break;
  54. }
  55. device->ref_count ++;
  56. rt_mutex_release(&(pipe->lock));
  57. return 0;
  58. }
  59. static int pipe_fops_close(struct dfs_fd *fd)
  60. {
  61. rt_device_t device;
  62. rt_pipe_t *pipe;
  63. pipe = (rt_pipe_t *)fd->data;
  64. if (!pipe) return -1;
  65. device = &(pipe->parent);
  66. rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
  67. switch (fd->flags & O_ACCMODE)
  68. {
  69. case O_RDONLY:
  70. pipe->readers --;
  71. break;
  72. case O_WRONLY:
  73. pipe->writers --;
  74. break;
  75. case O_RDWR:
  76. pipe->readers --;
  77. pipe->writers --;
  78. break;
  79. }
  80. if (pipe->writers == 0)
  81. {
  82. rt_wqueue_wakeup(&(pipe->reader_queue), (void*)(POLLIN | POLLERR | POLLHUP));
  83. }
  84. if (pipe->readers == 0)
  85. {
  86. rt_wqueue_wakeup(&(pipe->writer_queue), (void*)(POLLOUT | POLLERR | POLLHUP));
  87. }
  88. if (device->ref_count == 1)
  89. {
  90. rt_free(pipe->fifo);
  91. pipe->fifo = RT_NULL;
  92. }
  93. device->ref_count --;
  94. rt_mutex_release(&(pipe->lock));
  95. return 0;
  96. }
  97. static int pipe_fops_ioctl(struct dfs_fd *fd, int cmd, void *args)
  98. {
  99. rt_pipe_t *pipe;
  100. int ret = 0;
  101. pipe = (rt_pipe_t *)fd->data;
  102. switch (cmd)
  103. {
  104. case FIONREAD:
  105. *((int*)args) = rt_ringbuffer_data_len(pipe->fifo);
  106. break;
  107. case FIONWRITE:
  108. *((int*)args) = rt_ringbuffer_space_len(pipe->fifo);
  109. break;
  110. default:
  111. ret = -EINVAL;
  112. break;
  113. }
  114. return ret;
  115. }
  116. static int pipe_fops_read(struct dfs_fd *fd, void *buf, size_t count)
  117. {
  118. int len = 0;
  119. rt_pipe_t *pipe;
  120. pipe = (rt_pipe_t *)fd->data;
  121. /* no process has the pipe open for writing, return end-of-file */
  122. if (pipe->writers == 0)
  123. return 0;
  124. rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
  125. while (1)
  126. {
  127. if (pipe->writers == 0)
  128. {
  129. goto out;
  130. }
  131. len = rt_ringbuffer_get(pipe->fifo, buf, count);
  132. if (len > 0)
  133. {
  134. break;
  135. }
  136. else
  137. {
  138. if (fd->flags & O_NONBLOCK)
  139. {
  140. len = -EAGAIN;
  141. goto out;
  142. }
  143. rt_mutex_release(&pipe->lock);
  144. rt_wqueue_wakeup(&(pipe->writer_queue), (void*)POLLOUT);
  145. rt_wqueue_wait(&(pipe->reader_queue), 0, -1);
  146. rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
  147. }
  148. }
  149. /* wakeup writer */
  150. rt_wqueue_wakeup(&(pipe->writer_queue), (void*)POLLOUT);
  151. out:
  152. rt_mutex_release(&pipe->lock);
  153. return len;
  154. }
  155. static int pipe_fops_write(struct dfs_fd *fd, const void *buf, size_t count)
  156. {
  157. int len;
  158. rt_pipe_t *pipe;
  159. int wakeup = 0;
  160. int ret = 0;
  161. uint8_t *pbuf;
  162. pipe = (rt_pipe_t *)fd->data;
  163. if (pipe->readers == 0)
  164. {
  165. ret = -EPIPE;
  166. goto out;
  167. }
  168. if (count == 0)
  169. return 0;
  170. pbuf = (uint8_t*)buf;
  171. rt_mutex_take(&pipe->lock, -1);
  172. while (1)
  173. {
  174. if (pipe->readers == 0)
  175. {
  176. if (ret == 0)
  177. ret = -EPIPE;
  178. break;
  179. }
  180. len = rt_ringbuffer_put(pipe->fifo, pbuf, count - ret);
  181. ret += len;
  182. pbuf += len;
  183. wakeup = 1;
  184. if (ret == count)
  185. {
  186. break;
  187. }
  188. else
  189. {
  190. if (fd->flags & O_NONBLOCK)
  191. {
  192. if (ret == 0)
  193. {
  194. ret = -EAGAIN;
  195. }
  196. break;
  197. }
  198. }
  199. rt_mutex_release(&pipe->lock);
  200. rt_wqueue_wakeup(&(pipe->reader_queue), (void*)POLLIN);
  201. /* pipe full, waiting on suspended write list */
  202. rt_wqueue_wait(&(pipe->writer_queue), 0, -1);
  203. rt_mutex_take(&pipe->lock, -1);
  204. }
  205. rt_mutex_release(&pipe->lock);
  206. if (wakeup)
  207. {
  208. rt_wqueue_wakeup(&(pipe->reader_queue), (void*)POLLIN);
  209. }
  210. out:
  211. return ret;
  212. }
  213. static int pipe_fops_poll(struct dfs_fd *fd, rt_pollreq_t *req)
  214. {
  215. int mask = 0;
  216. rt_pipe_t *pipe;
  217. int mode = 0;
  218. pipe = (rt_pipe_t *)fd->data;
  219. rt_poll_add(&(pipe->reader_queue), req);
  220. rt_poll_add(&(pipe->writer_queue), req);
  221. switch (fd->flags & O_ACCMODE)
  222. {
  223. case O_RDONLY:
  224. mode = 1;
  225. break;
  226. case O_WRONLY:
  227. mode = 2;
  228. break;
  229. case O_RDWR:
  230. mode = 3;
  231. break;
  232. }
  233. if (mode & 1)
  234. {
  235. if (rt_ringbuffer_data_len(pipe->fifo) != 0)
  236. {
  237. mask |= POLLIN;
  238. }
  239. if (pipe->writers == 0)
  240. {
  241. mask |= POLLHUP;
  242. }
  243. }
  244. if (mode & 2)
  245. {
  246. if (rt_ringbuffer_space_len(pipe->fifo) != 0)
  247. {
  248. mask |= POLLOUT;
  249. }
  250. if (pipe->readers == 0)
  251. {
  252. mask |= POLLERR;
  253. }
  254. }
  255. return mask;
  256. }
  257. static const struct dfs_file_ops pipe_fops =
  258. {
  259. pipe_fops_open,
  260. pipe_fops_close,
  261. pipe_fops_ioctl,
  262. pipe_fops_read,
  263. pipe_fops_write,
  264. RT_NULL,
  265. RT_NULL,
  266. RT_NULL,
  267. pipe_fops_poll,
  268. };
  269. rt_pipe_t *rt_pipe_create(const char *name)
  270. {
  271. rt_pipe_t *pipe;
  272. rt_device_t dev;
  273. pipe = rt_malloc(sizeof(rt_pipe_t));
  274. if (pipe == RT_NULL) return RT_NULL;
  275. rt_memset(pipe, 0, sizeof(rt_pipe_t));
  276. rt_mutex_init(&(pipe->lock), name, RT_IPC_FLAG_FIFO);
  277. rt_list_init(&(pipe->reader_queue));
  278. rt_list_init(&(pipe->writer_queue));
  279. dev = &(pipe->parent);
  280. dev->type = RT_Device_Class_Pipe;
  281. if (rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR | RT_DEVICE_FLAG_REMOVABLE) != 0)
  282. {
  283. rt_free(pipe);
  284. return RT_NULL;
  285. }
  286. dev->fops = (void*)&pipe_fops;
  287. return pipe;
  288. }
  289. int rt_pipe_delete(const char *name)
  290. {
  291. int result = 0;
  292. rt_device_t device;
  293. device = rt_device_find(name);
  294. if (device)
  295. {
  296. if (device->type == RT_Device_Class_Pipe)
  297. {
  298. rt_pipe_t *pipe;
  299. if (device->ref_count != 0)
  300. {
  301. return -RT_EBUSY;
  302. }
  303. pipe = (rt_pipe_t *)device;
  304. rt_mutex_detach(&(pipe->lock));
  305. rt_device_unregister(device);
  306. rt_free(pipe);
  307. }
  308. else
  309. {
  310. result = -1;
  311. }
  312. }
  313. else
  314. {
  315. result = -1;
  316. }
  317. return result;
  318. }
  319. int pipe(int fildes[2])
  320. {
  321. rt_pipe_t *pipe;
  322. char dname[8];
  323. char dev_name[32];
  324. static int pipeno = 0;
  325. rt_snprintf(dname, sizeof(dname), "pipe%d", pipeno++);
  326. pipe = rt_pipe_create(dname);
  327. if (pipe == RT_NULL)
  328. {
  329. return -1;
  330. }
  331. rt_snprintf(dev_name, sizeof(dev_name), "/dev/%s", dname);
  332. fildes[0] = open(dev_name, O_RDONLY, 0);
  333. if (fildes[0] < 0)
  334. {
  335. return -1;
  336. }
  337. fildes[1] = open(dev_name, O_WRONLY, 0);
  338. if (fildes[1] < 0)
  339. {
  340. close(fildes[1]);
  341. return -1;
  342. }
  343. return 0;
  344. }
  345. int mkfifo(const char *path, mode_t mode)
  346. {
  347. rt_pipe_t *pipe;
  348. pipe = rt_pipe_create(path);
  349. if (pipe == RT_NULL)
  350. {
  351. return -1;
  352. }
  353. return 0;
  354. }
  355. #endif