diff options
Diffstat (limited to '')
44 files changed, 8295 insertions, 0 deletions
diff --git a/wsrep-lib/wsrep-API/CMakeLists.txt b/wsrep-lib/wsrep-API/CMakeLists.txt new file mode 100644 index 00000000..51bf81af --- /dev/null +++ b/wsrep-lib/wsrep-API/CMakeLists.txt @@ -0,0 +1,6 @@ +add_library(wsrep_api_v26 + v26/wsrep_dummy.c + v26/wsrep_gtid.c + v26/wsrep_loader.c + v26/wsrep_uuid.c +) diff --git a/wsrep-lib/wsrep-API/v26/.gitignore b/wsrep-lib/wsrep-API/v26/.gitignore new file mode 100644 index 00000000..4ec5efc6 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/.gitignore @@ -0,0 +1,10 @@ +*~ +*.o +*.a +*.diff +listener +CMakeFiles +CMakeCache.txt +cmake_install.cmake +Makefile +examples/node/node diff --git a/wsrep-lib/wsrep-API/v26/CMakeLists.txt b/wsrep-lib/wsrep-API/v26/CMakeLists.txt new file mode 100644 index 00000000..bd66d989 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/CMakeLists.txt @@ -0,0 +1,30 @@ +# Copyright (c) 2018, Codership Oy. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) + +INCLUDE_DIRECTORIES(".") + +SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wextra -Werror -Wconversion") + +IF (NOT CMAKE_BUILD_TYPE) + SET(CMAKE_BUILD_TYPE Release) +ENDIF() + +SET(WSREP_SOURCES wsrep_gtid.c wsrep_uuid.c wsrep_loader.c wsrep_dummy.c) + +ADD_LIBRARY(wsrep ${WSREP_SOURCES}) + +ADD_SUBDIRECTORY(examples) diff --git a/wsrep-lib/wsrep-API/v26/CONTRIBUTORS.txt b/wsrep-lib/wsrep-API/v26/CONTRIBUTORS.txt new file mode 100644 index 00000000..2943b885 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/CONTRIBUTORS.txt @@ -0,0 +1,28 @@ +All contributors are required to add their name and [Github username/email] +to this file in connection with their first contribution. If you are making +a contribution on behalf of a company, you should add the said company name. + +By adding your name and [Github username/email] to this file you agree that +your contribution is a contribution under a contributor agreement between +you and Codership Oy. To the extent that you are an employee of a company and +contribute in that role, you confirm that your contribution is a contribution +under the contribution license agreement between your employer and Codership +Oy; and that you have the authorization to give such confirmation. You confirm +that you have read, understood and signed the contributor license agreement +applicable to you. + +For the individual contributor agreement see file CONTRIBUTOR_AGREEMENT.txt +in the same directory as this file. + +Authors from Codership Oy: + + * Seppo Jaakola <seppo.jaakola@galeracluster.com>, Codership Oy + * Teemu Ollakka <teemu.ollakka@galeracluster.com>, Codership Oy + * Alexey Yurchenko <alexey.yurchenko@galeracluster.com>, Codership Oy + * Mario Karuza <mario.karuza@galeracluster.com>, Codership Oy + * Daniele Sciascia <daniele.sciascia@galeracluster.com>, Codership Oy + [Codership employees, add name and email/username above this line, but leave this line intact] + +Other contributors: + + [add name and email/username above this line, but leave this line intact] diff --git a/wsrep-lib/wsrep-API/v26/CONTRIBUTOR_AGREEMENT.txt b/wsrep-lib/wsrep-API/v26/CONTRIBUTOR_AGREEMENT.txt new file mode 100644 index 00000000..8bdec2fd --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/CONTRIBUTOR_AGREEMENT.txt @@ -0,0 +1,218 @@ + Codership + Contributor License Agreement + Codership CLA + +Thank you for your interest in contributing to Galera Cluster, a project +managed by Codership Oy, a legal entity established under Finnish laws, with +its principal address at Pohjolankatu 64 A, 00600 Helsinki Finland ("We", "Us" +or "Our"). + +This contributor agreement ("Agreement") documents the rights granted by +contributors to Us. To make this document effective, please either accept it +in an electronic service such as clahub.com or sign and scan it and send it to +Us by email. This is a legally binding document, so please read it carefully +before agreeing to it. This Agreement covers the Galera Cluster project: the +Galera library, the wsrep-lib library, the wsrep-API library, the Wsrep patch +for MySQL and other eventual patches to MySQL or other technologies. + +1. Definitions + +"You" means the individual who Submits a Contribution to Us or the Legal +Entity on behalf of whom a Contribution has been Submitted to Us. "Legal +Entity" means an entity which is not a natural person. "Affiliates" means +other Legal Entities that control, are controlled by, or under common control +with that Legal Entity. For the purposes of this definition, "control" means +(i) the power, direct or indirect, to cause the direction or management of +such Legal Entity, whether by contract or otherwise, (ii) ownership of fifty +percent (50%) or more of the outstanding shares or securities which vote to +elect the management or other persons who direct such Legal Entity or (iii) +beneficial ownership of such entity. + +"Contribution" means any work of authorship that is Submitted by You to Us in +which You own or assert ownership of the Copyright. If You do not own the +Copyright in the entire work of authorship, you need to have a separate +permission from Us. + +"Copyright" means all rights protecting works of authorship owned or +controlled by You, including copyright, moral and neighboring rights, as +appropriate, for the full term of their existence including any extensions by +You. + +"Material" means the work of authorship which is made available by Us to third +parties, i.e. the Galera library, the Wsrep patch for MySQL; other eventual +patches to MySQL; other eventual patches to other database technologies; all +these together with a database technology, such as MySQL, or its +derivatives. After You Submit the Contribution, it may be included in the +Material. + +"Submit" means any form of electronic, verbal, or written communication sent +to Us or our representatives, including but not limited to electronic mailing +lists, source code control systems, and issue tracking systems that are +managed by, or on behalf of, Us for the purpose of discussing and improving +the Material, provided that such communication is (i) conspicuously marked or +otherwise designated in writing by You or Your employee as a "Contribution" or +(ii) submitted in source code control system pursuant to Section 3 (e). + +"Submission Date" means the date on which You Submit a Contribution to Us. + +"Effective Date" means the date You execute this Agreement or the date You +first Submit a Contribution to Us, whichever is earlier. + +"Media" means any portion of a Contribution which is not software. + +2. Grant of Rights + +2.1 Copyright License + +(a) You retain ownership of the Copyright in Your Contribution and have the +same rights to use or license the Contribution which You would have had +without entering into the Agreement. In case we have in writing permitted +submitting a sublicense to licensed rights, You will not transfer the original +license, but grant us a sublicense in accordance with this Agreement. + +(b) To the maximum extent permitted by the relevant law, You grant to Us a +perpetual, worldwide, non-exclusive, transferable, royalty-free, irrevocable +license under the Copyright covering the Contribution, with the right to +sublicense such rights through multiple tiers of sublicensees, to reproduce, +modify, display, perform and distribute the Contribution as part of the +Material; provided that this license is conditioned upon compliance with +Section 2.3. + +2.2 Patent License + +For patent claims including, without limitation, method, process, and +apparatus claims which You, or in case You are a Legal Entity, You or Your +Affiliates, own, control or have the right to grant, now or in the future, You +grant to Us a perpetual, worldwide, non-exclusive, transferable, royalty-free, +irrevocable patent license, with the right to sublicense these rights to +multiple tiers of sublicensees, to make, have made, use, sell, offer for sale, +import and otherwise transfer the Contribution and the Contribution in +combination with the Material (and portions of such combination). This license +is granted only to the extent that the exercise of the licensed rights +infringes such patent claims; and provided that this license is conditioned +upon compliance with Section 2.3. + +2.3 Outbound License + +As a condition on the grant of rights in Sections 2.1 and 2.2, to the extent +we include Your Contribution or a part of it in a Material, we agree to +license the Contribution under the terms of the license or licenses which We +are using on the Submission Date for the Material or any licenses which are +approved by the Open Source Initiative ("OSI") on or after the Effective Date, +including both permissive and copyleft licenses, whether or not such licenses +are subsequently disapproved (including any right to adopt any future version +of a license if approved by the OSI). For clarity, this entitles us to license +Your Contribution also under a permissive open source license, such as the MIT +license, and include binaries created under the MIT license in a proprietary +licensed whole. + +In addition to above defined licenses, We may use the following licenses for +Media in the Contribution: Creative Commons BY 3.0 or Creative Commons BY-SA +3.0 (including the right to adopt any future version of a license). + +2.4 Moral Rights. + +If moral rights apply to the Contribution, to the maximum extent permitted by +law, You waive and agree not to assert such moral rights against Us or our +successors in interest, or any of our licensees, either direct or indirect. + +2.5 Enforcement. + +You, as a copyright holder to Your Contribution, hereby authorize us to +enforce the OSI approved license applied by Us to a Material, but only to the +extent Your Contribution has been included in a Material and always subject to +Our free discretion on whether such enforcement is necessary or not. + +2.6 Our Rights. + +You acknowledge that We are not obligated to use Your Contribution as part of +the Material and may decide to include any Contribution We consider +appropriate. + +2.7 Reservation of Rights. + +Any rights not expressly licensed under this section are expressly reserved by +You. + +3. Agreement + +You confirm that: + +(a) You have the legal authority to enter into this Agreement. + +(b) You or Your Affiliates, own the Copyright and patent claims covering the + Contribution which are required to grant the rights under Section 2. + +(c) The grant of rights under Section 2 does not violate any grant of rights + which You or Your Affiliates have made to third parties, including Your + employer. If You are an employee, You have had Your employer approve this + Agreement or sign the Entity version of this document. If You are less + than eighteen years old, please have Your parents or guardian sign the + Agreement. + +(d) You have not Submitted any Code You do not own without written permission + from US. + +(e) All pull or merge requests issued under usernames confirmed by You in + writing are issued by You; and all such pull or merge requests contain + Your Contributions under this Agreement. You will notify Us in writing in + the event of You no longer control such usernames. + +4. Disclaimer + +EXCEPT FOR THE EXPRESS WARRANTIES IN SECTION 3, THE CONTRIBUTION IS PROVIDED +"AS IS". MORE PARTICULARLY, ALL EXPRESS OR IMPLIED WARRANTIES INCLUDING, +WITHOUT LIMITATION, ANY IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NON-INFRINGEMENT ARE EXPRESSLY DISCLAIMED BY YOU TO +US. TO THE EXTENT THAT ANY SUCH WARRANTIES CANNOT BE DISCLAIMED, SUCH WARRANTY +IS LIMITED IN DURATION TO THE MINIMUM PERIOD PERMITTED BY LAW. + +5. Consequential Damage Waiver + +TO THE MAXIMUM EXTENT PERMITTED BY APPLICABLE LAW, IN NO EVENT WILL YOU BE +LIABLE FOR ANY LOSS OF PROFITS, LOSS OF ANTICIPATED SAVINGS, LOSS OF DATA, +INDIRECT, SPECIAL, INCIDENTAL, CONSEQUENTIAL AND EXEMPLARY DAMAGES ARISING OUT +OF THIS AGREEMENT REGARDLESS OF THE LEGAL OR EQUITABLE THEORY (CONTRACT, TORT +OR OTHERWISE) UPON WHICH THE CLAIM IS BASED. + +THIS WAIVER DOES NOT APPLY TO GROSS NEGLIGENT OR MALICIOUS ACTS OR FRAUD. + +6. Miscellaneous + +6.1 This Agreement will be governed by and construed in accordance with the +laws of Finland excluding its conflicts of law provisions. Under certain +circumstances, the governing law in this section might be superseded by the +United Nations Convention on Contracts for the International Sale of Goods +("UN Convention") and the parties intend to avoid the application of the UN +Convention to this Agreement and, thus, exclude the application of the UN +Convention in its entirety to this Agreement. + +6.2 Any and all Submissions done by You prior to execution of this Agreement +shall be nonetheless covered by this Agreement. + +6.3 This Agreement sets out the entire agreement between You and Us for Your +Contributions to Us and overrides all other agreements or understandings. + +6.4 If You or We assign the rights or obligations received through this +Agreement to a third party, as a condition of the assignment, that third party +must agree in writing to abide by all the rights and obligations in the +Agreement. + +6.5 The failure of either party to require performance by the other party of +any provision of this Agreement in one situation shall not affect the right of +a party to require such performance at any time in the future. A waiver of +performance under a provision in one situation shall not be considered a +waiver of the performance of the provision in the future or a waiver of the +provision in its entirety. + +6.6 If any provision of this Agreement is found void and unenforceable, such +provision will be replaced to the extent possible with a provision that comes +closest to the meaning of the original provision and which is enforceable. +The terms and conditions set forth in this Agreement shall apply +notwithstanding any failure of essential purpose of this Agreement or any +limited remedy to the maximum extent possible under law. + +This document has been drafted based on Harmony Inividual Contributor License +Agreement (HA-CLA-I) Version 1.0 July 4, 2011. HA-CLA-I is available from +harmonyagreements.org and is licensed by under Creative Commons Attribution +3.0 Unported License. diff --git a/wsrep-lib/wsrep-API/v26/COPYING b/wsrep-lib/wsrep-API/v26/COPYING new file mode 100644 index 00000000..d159169d --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/COPYING @@ -0,0 +1,339 @@ + GNU GENERAL PUBLIC LICENSE + Version 2, June 1991 + + Copyright (C) 1989, 1991 Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +License is intended to guarantee your freedom to share and change free +software--to make sure the software is free for all its users. This +General Public License applies to most of the Free Software +Foundation's software and to any other program whose authors commit to +using it. (Some other Free Software Foundation software is covered by +the GNU Lesser General Public License instead.) You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +this service if you wish), that you receive source code or can get it +if you want it, that you can change the software or use pieces of it +in new free programs; and that you know you can do these things. + + To protect your rights, we need to make restrictions that forbid +anyone to deny you these rights or to ask you to surrender the rights. +These restrictions translate to certain responsibilities for you if you +distribute copies of the software, or if you modify it. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must give the recipients all the rights that +you have. You must make sure that they, too, receive or can get the +source code. And you must show them these terms so they know their +rights. + + We protect your rights with two steps: (1) copyright the software, and +(2) offer you this license which gives you legal permission to copy, +distribute and/or modify the software. + + Also, for each author's protection and ours, we want to make certain +that everyone understands that there is no warranty for this free +software. If the software is modified by someone else and passed on, we +want its recipients to know that what they have is not the original, so +that any problems introduced by others will not reflect on the original +authors' reputations. + + Finally, any free program is threatened constantly by software +patents. We wish to avoid the danger that redistributors of a free +program will individually obtain patent licenses, in effect making the +program proprietary. To prevent this, we have made it clear that any +patent must be licensed for everyone's free use or not licensed at all. + + The precise terms and conditions for copying, distribution and +modification follow. + + GNU GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License applies to any program or other work which contains +a notice placed by the copyright holder saying it may be distributed +under the terms of this General Public License. The "Program", below, +refers to any such program or work, and a "work based on the Program" +means either the Program or any derivative work under copyright law: +that is to say, a work containing the Program or a portion of it, +either verbatim or with modifications and/or translated into another +language. (Hereinafter, translation is included without limitation in +the term "modification".) Each licensee is addressed as "you". + +Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running the Program is not restricted, and the output from the Program +is covered only if its contents constitute a work based on the +Program (independent of having been made by running the Program). +Whether that is true depends on what the Program does. + + 1. You may copy and distribute verbatim copies of the Program's +source code as you receive it, in any medium, provided that you +conspicuously and appropriately publish on each copy an appropriate +copyright notice and disclaimer of warranty; keep intact all the +notices that refer to this License and to the absence of any warranty; +and give any other recipients of the Program a copy of this License +along with the Program. + +You may charge a fee for the physical act of transferring a copy, and +you may at your option offer warranty protection in exchange for a fee. + + 2. You may modify your copy or copies of the Program or any portion +of it, thus forming a work based on the Program, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) You must cause the modified files to carry prominent notices + stating that you changed the files and the date of any change. + + b) You must cause any work that you distribute or publish, that in + whole or in part contains or is derived from the Program or any + part thereof, to be licensed as a whole at no charge to all third + parties under the terms of this License. + + c) If the modified program normally reads commands interactively + when run, you must cause it, when started running for such + interactive use in the most ordinary way, to print or display an + announcement including an appropriate copyright notice and a + notice that there is no warranty (or else, saying that you provide + a warranty) and that users may redistribute the program under + these conditions, and telling the user how to view a copy of this + License. (Exception: if the Program itself is interactive but + does not normally print such an announcement, your work based on + the Program is not required to print an announcement.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Program, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Program, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Program. + +In addition, mere aggregation of another work not based on the Program +with the Program (or with a work based on the Program) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may copy and distribute the Program (or a work based on it, +under Section 2) in object code or executable form under the terms of +Sections 1 and 2 above provided that you also do one of the following: + + a) Accompany it with the complete corresponding machine-readable + source code, which must be distributed under the terms of Sections + 1 and 2 above on a medium customarily used for software interchange; or, + + b) Accompany it with a written offer, valid for at least three + years, to give any third party, for a charge no more than your + cost of physically performing source distribution, a complete + machine-readable copy of the corresponding source code, to be + distributed under the terms of Sections 1 and 2 above on a medium + customarily used for software interchange; or, + + c) Accompany it with the information you received as to the offer + to distribute corresponding source code. (This alternative is + allowed only for noncommercial distribution and only if you + received the program in object code or executable form with such + an offer, in accord with Subsection b above.) + +The source code for a work means the preferred form of the work for +making modifications to it. For an executable work, complete source +code means all the source code for all modules it contains, plus any +associated interface definition files, plus the scripts used to +control compilation and installation of the executable. However, as a +special exception, the source code distributed need not include +anything that is normally distributed (in either source or binary +form) with the major components (compiler, kernel, and so on) of the +operating system on which the executable runs, unless that component +itself accompanies the executable. + +If distribution of executable or object code is made by offering +access to copy from a designated place, then offering equivalent +access to copy the source code from the same place counts as +distribution of the source code, even though third parties are not +compelled to copy the source along with the object code. + + 4. You may not copy, modify, sublicense, or distribute the Program +except as expressly provided under this License. Any attempt +otherwise to copy, modify, sublicense or distribute the Program is +void, and will automatically terminate your rights under this License. +However, parties who have received copies, or rights, from you under +this License will not have their licenses terminated so long as such +parties remain in full compliance. + + 5. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Program or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Program (or any work based on the +Program), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Program or works based on it. + + 6. Each time you redistribute the Program (or any work based on the +Program), the recipient automatically receives a license from the +original licensor to copy, distribute or modify the Program subject to +these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties to +this License. + + 7. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Program at all. For example, if a patent +license would not permit royalty-free redistribution of the Program by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Program. + +If any portion of this section is held invalid or unenforceable under +any particular circumstance, the balance of the section is intended to +apply and the section as a whole is intended to apply in other +circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system, which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 8. If the distribution and/or use of the Program is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Program under this License +may add an explicit geographical distribution limitation excluding +those countries, so that distribution is permitted only in or among +countries not thus excluded. In such case, this License incorporates +the limitation as if written in the body of this License. + + 9. The Free Software Foundation may publish revised and/or new versions +of the General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + +Each version is given a distinguishing version number. If the Program +specifies a version number of this License which applies to it and "any +later version", you have the option of following the terms and conditions +either of that version or of any later version published by the Free +Software Foundation. If the Program does not specify a version number of +this License, you may choose any version ever published by the Free Software +Foundation. + + 10. If you wish to incorporate parts of the Program into other free +programs whose distribution conditions are different, write to the author +to ask for permission. For software which is copyrighted by the Free +Software Foundation, write to the Free Software Foundation; we sometimes +make exceptions for this. Our decision will be guided by the two goals +of preserving the free status of all derivatives of our free software and +of promoting the sharing and reuse of software generally. + + NO WARRANTY + + 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY +FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN +OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES +PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED +OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS +TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE +PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, +REPAIR OR CORRECTION. + + 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR +REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, +INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING +OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED +TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY +YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER +PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +convey the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + <one line to give the program's name and a brief idea of what it does.> + Copyright (C) <year> <name of author> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +Also add information on how to contact you by electronic and paper mail. + +If the program is interactive, make it output a short notice like this +when it starts in an interactive mode: + + Gnomovision version 69, Copyright (C) year name of author + Gnomovision comes with ABSOLUTELY NO WARRANTY; for details type `show w'. + This is free software, and you are welcome to redistribute it + under certain conditions; type `show c' for details. + +The hypothetical commands `show w' and `show c' should show the appropriate +parts of the General Public License. Of course, the commands you use may +be called something other than `show w' and `show c'; they could even be +mouse-clicks or menu items--whatever suits your program. + +You should also get your employer (if you work as a programmer) or your +school, if any, to sign a "copyright disclaimer" for the program, if +necessary. Here is a sample; alter the names: + + Yoyodyne, Inc., hereby disclaims all copyright interest in the program + `Gnomovision' (which makes passes at compilers) written by James Hacker. + + <signature of Ty Coon>, 1 April 1989 + Ty Coon, President of Vice + +This General Public License does not permit incorporating your program into +proprietary programs. If your program is a subroutine library, you may +consider it more useful to permit linking proprietary applications with the +library. If this is what you want to do, use the GNU Lesser General +Public License instead of this License. diff --git a/wsrep-lib/wsrep-API/v26/README.md b/wsrep-lib/wsrep-API/v26/README.md new file mode 100644 index 00000000..27664805 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/README.md @@ -0,0 +1,7 @@ +# Write Set Replication API specification + +Building: +``` +cmake [-DCMAKE_BUILD_TYPE=Debug|Release] . && make [VERBOSE=1] +``` +in top directory. diff --git a/wsrep-lib/wsrep-API/v26/examples/CMakeLists.txt b/wsrep-lib/wsrep-API/v26/examples/CMakeLists.txt new file mode 100644 index 00000000..e6e33b78 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/CMakeLists.txt @@ -0,0 +1,19 @@ +# Copyright (c) 2019, Codership Oy. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +ADD_EXECUTABLE(listener listener.c) +TARGET_LINK_LIBRARIES(listener wsrep dl pthread) + +ADD_SUBDIRECTORY(node) diff --git a/wsrep-lib/wsrep-API/v26/examples/README.md b/wsrep-lib/wsrep-API/v26/examples/README.md new file mode 100644 index 00000000..b1b20744 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/README.md @@ -0,0 +1,14 @@ +## wsrep API usage examples + +### 1. Listener +Is a simple program that connects and listens to replication events in +an existing cluster. + +Usage example (starting listener on the same host as the rest of the cluster): +``` +$ ./listener /path_to/libgalera_smm.so gcomm://localhost:4567?gmcast.listen_addr=tcp://127.0.0.1:9999 cluster_name +``` + +### 2. Node +Is a more complex program which implements most of wsrep node functionality +and can form clusters in itself. diff --git a/wsrep-lib/wsrep-API/v26/examples/listener.c b/wsrep-lib/wsrep-API/v26/examples/listener.c new file mode 100644 index 00000000..9fc881fe --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/listener.c @@ -0,0 +1,268 @@ +/* Copyright (C) 2012 Codership Oy <info@codersihp.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/*! @file Example of wsrep event listener. Outputs description of received + * events to stdout. To get a general picture you should start with + * main() function. */ + +#include <wsrep_api.h> + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <signal.h> +#include <pthread.h> + +/*! This is global application context, it will be used by wsrep callbacks */ +struct application_context +{}; + +static struct application_context global_ctx; + +/*! This is receiving thread context, it will be used by wsrep callbacks */ +struct receiver_context +{ + char msg[4096]; +}; + +/* wsrep provider handle (global for simplicty) */ +static wsrep_t* wsrep = NULL; + +/*! This is a logger callback which library will be using to log events. */ +static void +logger_cb (wsrep_log_level_t level __attribute__((unused)), const char* msg) +{ + fprintf (stderr, "WSREP: %s\n", msg); +} + +/*! This will be called on cluster view change (nodes joining, leaving, etc.). + * Each view change is the point where application may be pronounced out of + * sync with the current cluster view and need state transfer. + * It is guaranteed that no other callbacks are called concurrently with it. */ +static wsrep_cb_status_t +view_cb (void* app_ctx __attribute__((unused)), + void* recv_ctx __attribute__((unused)), + const wsrep_view_info_t* view, + const char* state __attribute__((unused)), + size_t state_len __attribute__((unused))) +{ + printf ("New cluster membership view: %d nodes, my index is %d, " + "global seqno: %lld\n", + view->memb_num, view->my_idx, (long long)view->state_id.seqno); + + return WSREP_CB_SUCCESS; +} + +/*! This will be called on cluster view change (nodes joining, leaving, etc.). + * Each view change is the point where application may be pronounced out of + * sync with the current cluster view and need state transfer. + * It is guaranteed that no other callbacks are called concurrently with it. */ +static wsrep_cb_status_t +sst_request_cb (void* app_ctx __attribute__((unused)), + void** sst_req, + size_t* sst_req_len) +{ + /* For simplicity we're skipping state transfer by using magic string + * as a state transfer request. + * This node will not be considered JOINED (having full state) + * by other cluster members. */ + *sst_req = strdup(WSREP_STATE_TRANSFER_NONE); + + if (*sst_req) + *sst_req_len = strlen(*sst_req) + 1; + else + *sst_req_len = 0; + + return WSREP_CB_SUCCESS; +} + +/*! This is called to "apply" writeset. + * If writesets don't conflict on keys, it may be called concurrently to + * utilize several CPU cores. */ +static wsrep_cb_status_t +apply_cb (void* recv_ctx, + const wsrep_ws_handle_t* ws_handle __attribute__((unused)), + uint32_t flags __attribute__((unused)), + const wsrep_buf_t* ws __attribute__((unused)), + const wsrep_trx_meta_t* meta, + wsrep_bool_t* exit_loop __attribute__((unused))) +{ + struct receiver_context* ctx = (struct receiver_context*)recv_ctx; + + snprintf (ctx->msg, sizeof(ctx->msg), + "Got writeset %lld, size %zu", (long long)meta->gtid.seqno, + ws->len); + + bool const commit = flags & (WSREP_FLAG_TRX_END | WSREP_FLAG_ROLLBACK); + + wsrep->commit_order_enter(wsrep, ws_handle, meta); + if (commit) puts(ctx->msg); + wsrep->commit_order_leave(wsrep, ws_handle, meta, NULL); + + return WSREP_CB_SUCCESS; +} + +/* The following callbacks are stubs and not used in this example. */ +static wsrep_cb_status_t +unordered_cb(void* recv_ctx __attribute__((unused)), + const wsrep_buf_t* data __attribute__((unused))) +{ + return WSREP_CB_SUCCESS; +} + +static wsrep_cb_status_t +sst_donate_cb (void* app_ctx __attribute__((unused)), + void* recv_ctx __attribute__((unused)), + const wsrep_buf_t* msg __attribute__((unused)), + const wsrep_gtid_t* state_id __attribute__((unused)), + const wsrep_buf_t* state __attribute__((unused)), + wsrep_bool_t bypass __attribute__((unused))) +{ + return WSREP_CB_SUCCESS; +} + +static wsrep_cb_status_t synced_cb (void* app_ctx __attribute__((unused))) +{ + return WSREP_CB_SUCCESS; +} + +/* This is the listening thread. It blocks in wsrep::recv() call until + * disconnect from cluster. It will apply and commit writesets through the + * callbacks defined avbove. */ +static void* +recv_thread (void* arg) +{ + struct receiver_context* ctx = (struct receiver_context*)arg; + + wsrep_status_t rc = wsrep->recv(wsrep, ctx); + + fprintf (stderr, "Receiver exited with code %d", rc); + + return NULL; +} + +/* This is a signal handler to demonstrate graceful cluster leave. */ +static void +graceful_leave (int signum) +{ + printf ("Got signal %d, exiting...\n", signum); + wsrep->disconnect(wsrep); +} + +int main (int const argc, char* argv[]) +{ + if (argc < 4 || argc > 5) + { + fprintf (stderr, "Usage: %s </path/to/wsrep/provider> <wsrep URI> " + "<cluster name> [own address]\n", argv[0]); + exit (EXIT_FAILURE); + } + + const char* const wsrep_provider = argv[1]; + const char* const wsrep_uri = argv[2]; + const char* const cluster_name = argv[3]; + const char* const own_address = argc == 5 ? argv[4] : "localhost"; + + /* Now let's load and initialize provider */ + wsrep_status_t rc = wsrep_load (wsrep_provider, &wsrep, logger_cb); + if (WSREP_OK != rc) + { + fprintf (stderr, "Failed to load wsrep provider '%s'\n",wsrep_provider); + exit (EXIT_FAILURE); + } + + wsrep_gtid_t state_id = { WSREP_UUID_UNDEFINED, WSREP_SEQNO_UNDEFINED }; + + /* wsrep provider initialization arguments */ + struct wsrep_init_args wsrep_args = + { + .app_ctx = &global_ctx, + + .node_name = "example listener", + .node_address = own_address, + .node_incoming = "", + .data_dir = ".", // working directory + .options = "", + .proto_ver = 127, // maximum supported application event protocol + + .state_id = &state_id, + .state = NULL, + + .logger_cb = logger_cb, + .view_cb = view_cb, + .sst_request_cb = sst_request_cb, + .encrypt_cb = NULL, + .apply_cb = apply_cb, + .unordered_cb = unordered_cb, + .sst_donate_cb = sst_donate_cb, + .synced_cb = synced_cb + }; + + rc = wsrep->init(wsrep, &wsrep_args); + if (WSREP_OK != rc) + { + fprintf (stderr, "wsrep::init() failed: %d\n", rc); + exit (EXIT_FAILURE); + } + + /* Connect to cluster */ + rc = wsrep->connect(wsrep, cluster_name, wsrep_uri, "", 0); + if (0 != rc) + { + if (rc < 0) + fprintf (stderr, "wsrep::connect(%s, %s) failed: %d (%s)\n", + cluster_name, wsrep_uri, rc, strerror(-(int)rc)); + else + fprintf (stderr, "wsrep::connect() failed: %d\n", rc); + + exit (EXIT_FAILURE); + } + + /* Now let's start several listening threads*/ + int const num_threads = 4; + struct receiver_context thread_ctx[num_threads]; + pthread_t threads[num_threads]; + + int i; + for (i = 0; i < num_threads; i++) + { + int err = pthread_create ( + &threads[i], NULL, recv_thread, &thread_ctx[i]); + + if (err) + { + fprintf (stderr, "Failed to start thread %d: %d (%s)", + i, err, strerror(err)); + exit (EXIT_FAILURE); + } + } + + signal (SIGTERM, graceful_leave); + signal (SIGINT, graceful_leave); + + /* Listening threads are now running and receiving writesets. Wait for them + * to join. Threads will join after signal handler closes wsrep connection*/ + for (i = 0; i < num_threads; i++) + { + pthread_join (threads[i], NULL); + } + + /* Unload provider after nobody uses it any more. */ + wsrep_unload (wsrep); + + return 0; +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/CMakeLists.txt b/wsrep-lib/wsrep-API/v26/examples/node/CMakeLists.txt new file mode 100644 index 00000000..d018afde --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/CMakeLists.txt @@ -0,0 +1,26 @@ +# Copyright (c) 2019, Codership Oy. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +FILE(GLOB SRC + "*.h" + "*.c" + ) + +ADD_EXECUTABLE(node ${SRC}) + +TARGET_LINK_LIBRARIES(node wsrep dl pthread) + +CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/node.sh + ${CMAKE_CURRENT_BINARY_DIR}/node.sh COPYONLY) diff --git a/wsrep-lib/wsrep-API/v26/examples/node/README.md b/wsrep-lib/wsrep-API/v26/examples/node/README.md new file mode 100644 index 00000000..4a07c149 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/README.md @@ -0,0 +1,81 @@ +# wsrep API node application + +## Overview + +This is a simple application to demonstrate the usage of wsrep API. It +deliberately does nothing useful in order to present as concentrated and +concise API usage as possible. + +The program is deliberately written in C to demonstrate the naked API usage. +For C++ example see a much more advanced integration library at +https://github.com/codership/wsrep-lib + +## High level architecture + +Process-wise the program consists of an endless main loop that periodically +samples and prints performance stats and a configurable number of "master" and +"slave" threads, with master threads loop executing "transactions" and +replicating resulting "write sets" and slave threads receiving and processing +the write sets from other nodes. + +Object-wise the program is composed of two main objects: `store` and `wsrep`. +'store' object contains application "state" and generates and commits changes +to the state. `wsrep` object contains cluster context and provides interface +to it. Changes generated by `store` are replicated and certified through +`wsrep` and then committed to `store`. + +## Unit descriptions (in alphabetical order) + +#### ctx.h +A small header to declare the application context structure. + +#### log.* +Implements logging functionality for the application AND +**a logging callback** for the wsrep provider. + +#### main.c +Defines `main()` routine that initializes storage and wsrep provider, starts +the worker threads and loops in a statistics collection loop. Even though it is +not designed to return it still shows the deinitialization order. + +#### options.* +Implements reading configuration options from the command line, does not have +anything related to wsrep API, but shows which additional parameters must be +configured for the program to make use of wsrep clustering. + +#### socket.* +Network sockets boilerplate code for setting TCP connections between processes +(for SST). Has nothing wsrep-related and can be ignored. + +#### sst.* +Defines **SST callbacks** for the wsrep provider and shows how to asynchronously +implement state snapshot transfer (yes, you don't want to spend eternity in +callbacks). + +#### stats.* +Implements performance stats collecting function for the main loop. While it is +an absolutely optional provider functionality, still it shows how to use that. + +#### store.* +Defines the `store` object that pretends to store and modify some data in a +"transactional" manner. It provides the caller that intends to do a change with +a *change data* and a *key* for replication and certification. + +#### trx.* +Defines routines to process local and replicated transactions. + +#### worker.* +Implements worker thread pool functinality. Worker threads run routines defined +in 'trx.*'. Also implements **apply callback** for the wsrep provider. + +#### wsrep.* +Maintains wsrep cluster context: provider instance and cluster membership view. +While there is little use for the latter in this primitive application, still +it shows **connected and view callbacks** usage. But mostly, for this +application its purpose is to initialize the provider, connect to the cluster +and offer access to initialized provider for other parts of the program. + +## Example usage +``` +./node -f /tmp/galera/0 -v /tmp/galera/0/galera/lib/libgalera_smm.so -o 'pc.weight=2;evs.send_window=2;evs.user_send_window=1;gcache.recover=no' -s 8 -m 16 +``` diff --git a/wsrep-lib/wsrep-API/v26/examples/node/ctx.h b/wsrep-lib/wsrep-API/v26/examples/node/ctx.h new file mode 100644 index 00000000..01653554 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/ctx.h @@ -0,0 +1,34 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines application context for wsrep provider + */ + +#ifndef NODE_CTX_H +#define NODE_CTX_H + +#include "store.h" +#include "wsrep.h" + +struct node_ctx +{ + node_wsrep_t* wsrep; + node_store_t* store; + const struct node_options* opts; +}; + +#endif /* NODE_CTX_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/log.c b/wsrep-lib/wsrep-API/v26/examples/node/log.c new file mode 100644 index 00000000..71f4705c --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/log.c @@ -0,0 +1,100 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "log.h" + +#include <stdio.h> // fprintf(), fflush() +#include <sys/time.h> // gettimeofday() +#include <time.h> // localtime_r() +#include <stdarg.h> // va_start(), va_end() + +wsrep_log_level_t node_log_max_level = WSREP_LOG_INFO; + +static const char* log_level_str[WSREP_LOG_DEBUG + 2] = +{ + "FATAL: ", + "ERROR: ", + " WARN: ", + " INFO: ", + "DEBUG: ", + "XXXXX: " +}; + +static inline void +log_timestamp_and_log(const char* const prefix, // source of msg + int const severity, + const char* const msg) +{ + struct tm date; + struct timeval time; + + gettimeofday(&time, NULL); + localtime_r (&time.tv_sec, &date); + + FILE* log_file = stderr; + fprintf(log_file, + "%04d-%02d-%02d %02d:%02d:%02d.%03d " /* timestamp fmt */ + "[%s] %s%s\n", /* [prefix] severity msg */ + date.tm_year + 1900, date.tm_mon + 1, date.tm_mday, + date.tm_hour, date.tm_min, date.tm_sec, + (int)time.tv_usec / 1000, + prefix, log_level_str[severity], msg + ); + + fflush (log_file); +} + +void +node_log_cb(wsrep_log_level_t const severity, const char* const msg) +{ + /* REPLICATION: let provider log messages be prefixed with 'wsrep'*/ + log_timestamp_and_log("wsrep", severity, msg); +} + +void +node_log(wsrep_log_level_t const severity, + const char* const file, + const char* const function, + int const line, + ...) +{ + va_list ap; + + char string[2048]; + int max_string = sizeof(string); + char* str = string; + + /* provide file:func():line info only if debug logging is on */ + if (NODE_DO_LOG_DEBUG) { + int const len = snprintf(str, (size_t)max_string, "%s:%s():%d: ", + file, function, line); + str += len; + max_string -= len; + } + + va_start(ap, line); + { + const char* format = va_arg (ap, const char*); + + if (max_string > 0 && NULL != format) { + vsnprintf (str, (size_t)max_string, format, ap); + } + } + va_end(ap); + + /* actual logging */ + log_timestamp_and_log(" node", severity, string); +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/log.h b/wsrep-lib/wsrep-API/v26/examples/node/log.h new file mode 100644 index 00000000..09404f26 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/log.h @@ -0,0 +1,69 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines logging macros for the application and + * a logger callback for the wsrep provider. + */ + +#ifndef NODE_LOG_H +#define NODE_LOG_H + +#include "../../wsrep_api.h" + +/** + * REPLICATION: a logger callback for wsrep provider + */ +extern void +node_log_cb(wsrep_log_level_t severity, const char* message); + +/** + * Applicaton log function intended to be used through the macros defined below. + * For simplicity it uses log levels defined by wsrep API, but it does not have + * to. */ +extern void +node_log (wsrep_log_level_t level, + const char* file, + const char* function, + const int line, + ...); + +/** + * This variable made global to avoid calling node_log() when debug logging + * is disabled. */ +extern wsrep_log_level_t node_log_max_level; +#define NODE_DO_LOG_DEBUG (WSREP_LOG_DEBUG <= node_log_max_level) + +/** + * Base logging macro that records current file, function and line number */ +#define NODE_LOG(level, ...)\ + node_log(level, __FILE__, __func__, __LINE__, __VA_ARGS__, NULL) + +/** + * @name Logging macros. + * Must be implemented as macros to report the location of the code where + * they are called. + */ +/*@{*/ +#define NODE_FATAL(...) NODE_LOG(WSREP_LOG_FATAL, __VA_ARGS__, NULL) +#define NODE_ERROR(...) NODE_LOG(WSREP_LOG_ERROR, __VA_ARGS__, NULL) +#define NODE_WARN(...) NODE_LOG(WSREP_LOG_WARN, __VA_ARGS__, NULL) +#define NODE_INFO(...) NODE_LOG(WSREP_LOG_INFO, __VA_ARGS__, NULL) +#define NODE_DEBUG(...) if (NODE_DO_LOG_DEBUG) \ + { NODE_LOG(WSREP_LOG_DEBUG, __VA_ARGS__, NULL); } +/*@}*/ + +#endif /* NODE_LOG_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/main.c b/wsrep-lib/wsrep-API/v26/examples/node/main.c new file mode 100644 index 00000000..f3124042 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/main.c @@ -0,0 +1,146 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "ctx.h" +#include "log.h" +#include "options.h" +#include "stats.h" +#include "worker.h" +#include "wsrep.h" + +#include <errno.h> +#include <signal.h> // sigaction() +#include <string.h> // strerror() + +static void +signal_handler(int const signum) +{ + NODE_INFO("Got signal %d. Terminating.", signum); +} + +static void +install_signal_handler(void) +{ + sigset_t sa_mask; + sigemptyset(&sa_mask); + + struct sigaction const act = + { + .sa_handler = signal_handler, + .sa_mask = sa_mask, + .sa_flags = (int)SA_RESETHAND + }; + + if (sigaction(SIGINT /* Ctrl-C */, &act, NULL)) + { + NODE_INFO("sigaction() failed: %d (%s)", errno, strerror(errno)); + abort(); + } +} + +int main(int argc, char* argv[]) +{ + install_signal_handler(); + + struct node_options opts; + int err = node_options_read(argc, argv, &opts); + if (err) + { + NODE_FATAL("Failed to read command line opritons: %d (%s)", + err, strerror(err)); + return err; + } + + struct node_ctx node; + node.opts = &opts; + + /* REPLICATION: before connecting to cluster we need to initialize our + * storage to know our current position (GTID) */ + node.store = node_store_open(&opts); + if (!node.store) + { + NODE_FATAL("Failed to open node store"); + return 1; + } + + wsrep_gtid_t current_gtid; + node_store_gtid(node.store, ¤t_gtid); + + /* REPLICATION: complete initialization of application context + * (including provider itself) */ + node.wsrep = node_wsrep_init(&opts, ¤t_gtid, &node); + if (!node.wsrep) + { + NODE_FATAL("Failed to initialize wsrep provider"); + return 1; + } + + /* REPLICATION: now we can connect to the cluster and start receiving + * replication events */ + if (node_wsrep_connect(node.wsrep, opts.address, opts.bootstrap) != + WSREP_OK) + { + NODE_FATAL("Failed to connect to primary component"); + return 1; + } + + /* REPLICATION: and start processing replicaiton events */ + struct node_worker_pool* slave_pool = + node_worker_start(&node, NODE_WORKER_SLAVE, (size_t)opts.slaves); + if (!slave_pool) + { + NODE_FATAL("Failed to create slave worker pool"); + return 1; + } + + /* REPLICATION: now that replicaton events are being processed we can + * wait to sync with the cluster */ + if (!node_wsrep_wait_synced(node.wsrep)) + { + NODE_ERROR("Failed to wait fir SYNCED event"); + return 1; + } + + NODE_INFO("Synced with cluster"); + + /* REPLICATION: now we can start replicate own events */ + struct node_worker_pool* master_pool = + node_worker_start(&node, NODE_WORKER_MASTER, (size_t)opts.masters); + if (opts.masters > 0 && !master_pool) + { + NODE_FATAL("Failed to create master worker pool"); + return 1; + } + + node_stats_loop(&node, (int)opts.period); + + /* REPLICATON: to shut down we go in the opposite order: + * first - disconnect from the cluster to signal master threads + * to exit loop, + * second - join master and slave threads, + * third - close provider once not in use */ + node_wsrep_disconnect(node.wsrep); + + node_worker_stop(master_pool); + node_worker_stop(slave_pool); + + node_wsrep_close(node.wsrep); + + /* and finally, when the storage can no longer be disturbed, close it */ + node_store_close(node.store); + + return 0; +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/node.sh b/wsrep-lib/wsrep-API/v26/examples/node/node.sh new file mode 100755 index 00000000..40e0a498 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/node.sh @@ -0,0 +1,40 @@ +#!/bin/sh -eu + +NODE_ID=$1 + +NODE_NAME=${NODE_NAME:-$NODE_ID} + +NODE_DIR=${NODE_DIR:-/tmp/node/$NODE_NAME} +rm -rf $NODE_DIR/* +mkdir -p $NODE_DIR + +NODE_OPT=${NODE_OPT:-} + +NODE_HOST=${NODE_HOST:-localhost} +NODE_PORT=${NODE_PORT:-$((10000 + $NODE_ID))} + +NODE_CLIENTS=${NODE_CLIENTS:-1} +NODE_APPLIERS=${NODE_APPLIERS:-1} + +NODE_ADDR=${NODE_ADDR:-} + +NODE_BIN=${NODE_BIN:-$(dirname $0)/node} + +# convert possible relative path to absolute path +NODE_PROVIDER=$(realpath $NODE_PROVIDER) + +set -x + +$NODE_BIN \ +-v "$NODE_PROVIDER" \ +-n "$NODE_NAME" \ +-f "$NODE_DIR" \ +-o "$NODE_OPT" \ +-t "$NODE_HOST" \ +-p $NODE_PORT \ +-s $NODE_APPLIERS \ +-m $NODE_CLIENTS \ +-d 10 \ +-a "$NODE_ADDR" + +set +x diff --git a/wsrep-lib/wsrep-API/v26/examples/node/options.c b/wsrep-lib/wsrep-API/v26/examples/node/options.c new file mode 100644 index 00000000..0bd08ffb --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/options.c @@ -0,0 +1,291 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "options.h" + +#include <ctype.h> // isspace() +#include <errno.h> +#include <getopt.h> +#include <stdio.h> +#include <stdlib.h> // strtol() +#include <string.h> // strcmp() + +/* + * getopt_long() declarations begin + */ + +#define OPTS_NA no_argument +#define OPTS_RA required_argument +#define OPTS_OA optional_argument + +typedef enum opt +{ + OPTS_NOOPT = 0, + OPTS_ADDRESS = 'a', + OPTS_BOOTSTRAP = 'b', + OPTS_DELAY = 'd', + OPTS_DATA_DIR = 'f', + OPTS_HELP = 'h', + OPTS_PERIOD = 'i', + OPTS_MASTERS = 'm', + OPTS_NAME = 'n', + OPTS_OPTIONS = 'o', + OPTS_BASE_PORT = 'p', + OPTS_RECORDS = 'r', + OPTS_SLAVES = 's', + OPTS_BASE_HOST = 't', + OPTS_PROVIDER = 'v', + OPTS_WS_SIZE = 'w', + OPTS_OPS = 'x' +} + opt_t; + +static struct option s_opts[] = +{ + { "address", OPTS_RA, NULL, OPTS_ADDRESS }, + { "bootstrap", OPTS_NA, NULL, OPTS_BOOTSTRAP }, + { "delay", OPTS_RA, NULL, OPTS_DELAY }, + { "storage", OPTS_RA, NULL, OPTS_DATA_DIR }, + { "help", OPTS_NA, NULL, OPTS_HELP }, + { "period", OPTS_RA, NULL, OPTS_PERIOD }, + { "masters", OPTS_RA, NULL, OPTS_MASTERS }, + { "name", OPTS_RA, NULL, OPTS_NAME }, + { "options", OPTS_RA, NULL, OPTS_OPTIONS, }, + { "base-port", OPTS_RA, NULL, OPTS_BASE_PORT }, + { "records", OPTS_RA, NULL, OPTS_RECORDS }, + { "slaves", OPTS_RA, NULL, OPTS_SLAVES }, + { "base-host", OPTS_RA, NULL, OPTS_BASE_HOST }, + { "provider", OPTS_RA, NULL, OPTS_PROVIDER }, + { "size", OPTS_RA, NULL, OPTS_WS_SIZE }, + { "ops", OPTS_RA, NULL, OPTS_OPS }, + { NULL, 0, NULL, 0 } +}; + +static const char* opts_string = "a:d:f:hi:m:n:o:p:r:s:t:v:w:x:"; + +/* + * getopt_long() declarations end + */ + +static const struct node_options opts_defaults = +{ + .provider = "none", + .address = "", + .options = "", + .name = "unnamed", + .data_dir = ".", + .base_host = "localhost", + .masters = 0, + .slaves = 1, + .ws_size = 1024, + .records = 1024*1024, + .delay = 0, + .base_port = 4567, + .period = 10, + .operations= 1, + .bootstrap = true +}; + +static void +opts_print_help(FILE* out, const char* prog_name) +{ + fprintf( + out, + "Usage: %s [OPTION...]\n" + "\n" + " -h, --help this thing.\n" + " -v, --provider=PATH a path to wsrep provider library file.\n" + " -a, --address=STRING list of node addresses in the group.\n" + " If not set the node assumes that it is the first\n" + " node in the group (default)\n" + " -o, --options=STRING a string of wsrep provider options.\n" + " -n, --name=STRING human-readable node name.\n" + " -f, --data-dir=PATH a directory to save working data in.\n" + " Should be private to the process.\n" + " -t, --base-host=ADDRESS address of this node at which other members can\n" + " connect to it\n" + " -p, --base-port=NUM base port which the node shall listen for\n" + " connections from other members. This port will be\n" + " used for replication, port+1 for IST and port+2\n" + " for SST. Default: 4567\n" + " -m, --masters=NUM number of concurrent master workers.\n" + " -s, --slaves=NUM number of concurrent slave workers.\n" + " (can't be less than 1)\n" + " -w, --size=NUM desirable size of the resulting writesets\n" + " (approximate lower boundary). Default: 1K\n" + " -r, --records=NUM number of records in the store. Default: 1M\n" + " -x, --ops=NUM number of operations per transaction. Default: 1\n" + " -d, --delay=NUM delay in milliseconds between \"commits\"\n" + " (per master thread).\n" + " -b, --bootstrap bootstrap the cluster with this node.\n" + " Default: 'Yes' if --address is not given, 'No'\n" + " otherwise.\n" + " -i, --period period in seconds between performance stats output\n" + "\n" + , prog_name); +} + +static void +opts_print_config(FILE* out, const struct node_options* opts) +{ + fprintf( + out, + "Continuing with the following configuration:\n" + "provider: %s\n" + "address: %s\n" + "options: %s\n" + "name: %s\n" + "data dir: %s\n" + "base addr: %s:%ld\n" + "masters: %ld\n" + "slaves: %ld\n" + "writeset size: %ld bytes\n" + "records: %ld\n" + "operations: %ld\n" + "commit delay: %ld ms\n" + "stats period: %ld s\n" + "bootstrap: %s\n" + , + opts->provider, opts->address, opts->options, opts->name, opts->data_dir, + opts->base_host, opts->base_port, + opts->masters, opts->slaves, opts->ws_size, opts->records, + opts->operations, + opts->delay, opts->period, opts->bootstrap ? "Yes" : "No" + ); +} + +static int +opts_check_conversion(int cond, const char* ptr, int idx) +{ + if (!cond || errno || (*ptr != '\0' && !isspace(*ptr))) + { + fprintf(stderr, "Bad value for %s option.\n", s_opts[idx].name); + return EINVAL; + } + return 0; +} + +int +node_options_read(int argc, char* argv[], struct node_options* opts) +{ + *opts = opts_defaults; + + int opt = 0; + int opt_idx = 0; + char* endptr; + int ret = 0; + + bool address_given = false; + bool bootstrap_given = false; + + while ((opt = getopt_long(argc, argv, opts_string, s_opts, &opt_idx)) != -1) + { + switch (opt) + { + case OPTS_ADDRESS: + address_given = strcmp(opts->address, optarg); + opts->address = optarg; + break; + case OPTS_BOOTSTRAP: + bootstrap_given = true; + opts->bootstrap = true; + break; + case OPTS_DELAY: + opts->delay = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion(opts->delay >= 0, endptr, opt_idx))) + goto err; + break; + case OPTS_DATA_DIR: + opts->data_dir = optarg; + break; + case OPTS_HELP: + ret = 1; + goto help; + case OPTS_PERIOD: + opts->period = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion(opts->period > 0, endptr, opt_idx))) + goto err; + break; + case OPTS_MASTERS: + opts->masters = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion(opts->masters >= 0, endptr, + opt_idx))) + goto err; + break; + case OPTS_NAME: + opts->name = optarg; + break; + case OPTS_OPTIONS: + opts->options = optarg; + break; + case OPTS_BASE_PORT: + opts->base_port = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion( + opts->base_port > 0 && opts->base_port < 65536, + endptr, opt_idx))) + goto err; + break; + case OPTS_RECORDS: + opts->records = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion(opts->records >= 0, endptr, + opt_idx))) + goto err; + break; + case OPTS_SLAVES: + opts->slaves = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion(opts->slaves > 0, endptr, opt_idx))) + goto err; + break; + case OPTS_BASE_HOST: + opts->base_host = optarg; + break; + case OPTS_PROVIDER: + opts->provider = optarg; + break; + case OPTS_WS_SIZE: + opts->ws_size = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion(opts->ws_size > 0, endptr, + opt_idx))) + goto err; + break; + case OPTS_OPS: + opts->operations = strtol(optarg, &endptr, 10); + if ((ret = opts_check_conversion(opts->operations >= 1, endptr, + opt_idx))) + goto err; + break; + default: + ret = EINVAL; + } + } + +help: + if (ret) { + opts_print_help(stderr, argv[0]); + } + else + { + if (!bootstrap_given) + { + opts->bootstrap = !address_given; + } + opts_print_config(stdout, opts); + opts->delay *= 1000; /* convert to microseconds for usleep() */ + } + +err: + return ret; +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/options.h b/wsrep-lib/wsrep-API/v26/examples/node/options.h new file mode 100644 index 00000000..62172281 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/options.h @@ -0,0 +1,48 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines options interface + */ + +#ifndef NODE_OPTIONS_H +#define NODE_OPTIONS_H + +#include <stdbool.h> + +struct node_options +{ + const char* provider; // path to wsrep provider + const char* address; // wsrep cluster address string + const char* options; // wsrep option string + const char* name; // node name (for logging purposes) + const char* data_dir; // name of the storage file + const char* base_host;// host own address + long masters; // number of master threads + long slaves; // number of slave threads + long ws_size; // desired writeset size + long records; // total number of records + long delay; // delay between commits + long base_port;// base port to use + long period; // statistics output interval + long operations;// number of "statements" in a "transaction" + bool bootstrap;// bootstrap the cluster with this node +}; + +extern int +node_options_read(int argc, char* argv[], struct node_options* opts); + +#endif /* NODE_OPTIONS_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/socket.c b/wsrep-lib/wsrep-API/v26/examples/node/socket.c new file mode 100644 index 00000000..377abcaf --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/socket.c @@ -0,0 +1,304 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "socket.h" + +#include "log.h" + +#include <assert.h> +#include <ctype.h> // isspace() +#include <errno.h> +#include <limits.h> // USHRT_MAX +#include <netdb.h> // struct addrinfo +#include <stdio.h> // snprintf() +#include <string.h> // strerror() +#include <sys/socket.h> // bind(), connect(), accept(), send(), recv() + +struct node_socket +{ + int fd; +}; + +/** + * Initializes addrinfo from the separate host address and port arguments + * + * Requires calling freeaddrinfo() later + * + * @param[in] host - if NULL, will be initialized for listening + * @param[in] port + * + * @return struct addrinfo* or NULL in case of error + */ +static struct addrinfo* +socket_get_addrinfo2(const char* const host, + uint16_t const port) +{ + struct addrinfo const hints = + { + .ai_flags = AI_PASSIVE | /** will be ignored if host is not NULL */ + AI_NUMERICSERV, /** service is a numeric port */ + .ai_family = AF_UNSPEC, /** either IPv4 or IPv6 */ + .ai_socktype = SOCK_STREAM, /** STREAM or DGRAM */ + .ai_protocol = 0, + .ai_addrlen = 0, + .ai_addr = NULL, + .ai_canonname = NULL, + .ai_next = NULL + }; + + char service[6]; + snprintf(service, sizeof(service), "%hu", port); + + struct addrinfo* info; + int err = getaddrinfo(host, service, &hints, &info); + if (err) + { + NODE_ERROR("Failed to resolve '%s': %d (%s)", + host, err, gai_strerror(err)); + return NULL; + } + + return info; +} + +/** + * Initializes addrinfo from single address and port string + * The port is expected to be in numerical form and appended to the host address + * via colon. + * + * Requires calling freeaddrinfo() later + * + * @param[in] addr full address specification, including port + * + * @return struct addrinfo* or NULL in case of error + */ +static struct addrinfo* +socket_get_addrinfo1(const char* const addr) +{ + int const addr_len = (int)strlen(addr); + char* const addr_buf = strdup(addr); + if (!addr_buf) + { + NODE_ERROR("strdup(%s) failed: %d (%s)", addr, errno, strerror(errno)); + return NULL; + } + + struct addrinfo* res = NULL; + long port; + char* endptr; + + int i; + for (i = addr_len - 1; i >= 0; i--) + { + if (addr_buf[i] == ':') break; + } + + if (addr_buf[i] != ':') + { + NODE_ERROR("Malformed address:port string: '%s'", addr); + goto end; + } + + addr_buf[i] = '\0'; + port = strtol(addr_buf + i + 1, &endptr, 10); + + if (port <= 0 || port > USHRT_MAX || errno || + (*endptr != '\0' && !isspace(*endptr))) + { + NODE_ERROR("Malformed/invalid port: '%s'. Errno: %d (%s)", + addr_buf + i + 1, errno, strerror(errno)); + goto end; + } + + res = socket_get_addrinfo2(strlen(addr_buf) > 0 ? addr_buf : NULL, + (uint16_t)port); +end: + free(addr_buf); + return res; +} + +static struct node_socket* +socket_create(int const fd) +{ + assert(fd > 0); + + struct node_socket* res = calloc(1, sizeof(struct node_socket)); + if (res) + { + res->fd = fd; + } + else + { + NODE_ERROR("Failed to allocate struct node_socket: %d (%s)", + errno, strerror(errno)); + close(fd); + } + + return res; +} + +/** + * Definition of function type with the signature of bind() and connect() + */ +typedef int (*socket_act_fun_t) (int sfd, + const struct sockaddr* addr, + socklen_t addrlen); + +static int +socket_bind_and_listen(int const sfd, + const struct sockaddr* const addr, + socklen_t const addrlen) +{ + int ret = bind(sfd, addr, addrlen); + + if (!ret) + ret = listen(sfd, SOMAXCONN); + + return ret; +} + +/** + * A "template" method to do the "right thing" with the addrinfo and create a + * socket from it. The "right thing" would normally be bind and listen for + * a server socket OR connect for a client socket. + * + * @param[in] info addrinfo list, swallowed and deallocated + * @param[in] action_fun the "right thing" to do on socket and struct sockaddr + * @param[in] action_str action description to be printed in the error message + * @param[in] orig_host host address to be pronted in the error message + * @param[in] orig_port port to be printed in the error message, if orig_host + * string contains the port, this parameter should be 0 + * + * The last three parameters are for diagnostic puposes only. orig_host and + * orig_port are supposed to be what were used to obtain addrinfo. + * + * @return new struct node_socket. + */ +static struct node_socket* +socket_from_addrinfo(struct addrinfo* const info, + socket_act_fun_t const action_fun, + const char* const action_str, + const char* const orig_host, + uint16_t const orig_port) +{ + int sfd; + int err = 0; + + /* Iterate over addrinfo list and try to apply action_fun on the resulting + * socket. Once successful, break loop. */ + struct addrinfo* addr; + for (addr = info; addr != NULL; addr = addr->ai_next) + { + sfd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + if (sfd == -1) + { + err = errno; + continue; + } + + if (action_fun(sfd, addr->ai_addr, addr->ai_addrlen) == 0) break; + + err = errno; + close(sfd); + } + + freeaddrinfo(info); /* no longer needed */ + + if (!addr) + { + NODE_ERROR("Failed to %s to '%s%s%.0hu': %d (%s)", + action_str, + orig_host ? orig_host : "", orig_port > 0 ? ":" : "", + orig_port > 0 ? orig_port : 0, /* won't be printed if 0 */ + err, strerror(err)); + return NULL; + } + + assert(sfd > 0); + return socket_create(sfd); +} + +struct node_socket* +node_socket_listen(const char* const host, uint16_t const port) +{ + struct addrinfo* const info = socket_get_addrinfo2(host, port); + if (!info) return NULL; + + return socket_from_addrinfo(info, socket_bind_and_listen, + "bind a listening socket", host, port); +} + +struct node_socket* +node_socket_connect(const char* const addr_str) +{ + struct addrinfo* const info = socket_get_addrinfo1(addr_str); + if (!info) return NULL; + + return socket_from_addrinfo(info, connect, "connect", addr_str, 0); +} + +struct node_socket* +node_socket_accept(struct node_socket* socket) +{ + int sfd = accept(socket->fd, NULL, NULL); + + if (sfd < 0) + { + NODE_ERROR("Failed to accept connection: %d (%s)", + errno, strerror(errno)); + return NULL; + } + + return socket_create(sfd); +} + +int +node_socket_send_bytes(node_socket_t* socket, const void* buf, size_t len) +{ + ssize_t const ret = send(socket->fd, buf, len, MSG_NOSIGNAL); + + if (ret != (ssize_t)len) + { + NODE_ERROR("Failed to send %zu bytes: %d (%s)", errno, strerror(errno)); + return -1; + } + + return 0; +} + +int +node_socket_recv_bytes(node_socket_t* socket, void* buf, size_t len) +{ + ssize_t const ret = recv(socket->fd, buf, len, MSG_WAITALL); + + if (ret != (ssize_t)len) + { + NODE_ERROR("Failed to recv %zu bytes: %d (%s)", errno, strerror(errno)); + return -1; + } + + return 0; +} + +void +node_socket_close(node_socket_t* socket) +{ + if (!socket) return; + + if (socket->fd > 0) close(socket->fd); + + free(socket); +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/socket.h b/wsrep-lib/wsrep-API/v26/examples/node/socket.h new file mode 100644 index 00000000..3a77eff3 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/socket.h @@ -0,0 +1,72 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit implements auxiliary networking functions (for SST purposes) + * It has nothing wsrep related and is not of general purpose. + */ + +#ifndef NODE_SOCKET_H +#define NODE_SOCKET_H + +#include <stddef.h> // size_t +#include <stdint.h> // uint16_t + +typedef struct node_socket node_socket_t; + +/** + * Open listening socket at a given address + * + * @return listening socket + */ +extern node_socket_t* +node_socket_listen(const char* host, uint16_t port); + +/** + * Connect to a given address. + * + * @return connected socket + */ +extern node_socket_t* +node_socket_connect(const char* addr); + +/** + * Wait for connection on a listening socket + * @return connected socket + */ +extern node_socket_t* +node_socket_accept(node_socket_t* s); + +/** + * Send a given number of bytes + * @return 0 or a negative error code + */ +extern int +node_socket_send_bytes(node_socket_t* s, const void* buf, size_t len); + +/** + * Receive a given number of bytes + * @return 0 or a negative error code + */ +extern int +node_socket_recv_bytes(node_socket_t* s, void* buf, size_t len); + +/** + * Release all recources associated with the socket */ +extern void +node_socket_close(node_socket_t* s); + +#endif /* NODE_SOCKET_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/sst.c b/wsrep-lib/wsrep-API/v26/examples/node/sst.c new file mode 100644 index 00000000..e93534ef --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/sst.c @@ -0,0 +1,372 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "sst.h" + +#include "ctx.h" +#include "log.h" +#include "socket.h" + +#include <arpa/inet.h> // htonl() +#include <assert.h> +#include <errno.h> +#include <pthread.h> +#include <stdio.h> // snprintf() +#include <stdlib.h> // abort() +#include <string.h> // strdup() +#include <unistd.h> // usleep() + +/** + * Helper: creates detached thread */ +static int +sst_create_thread(void* (*thread_routine) (void*), + void* const thread_arg) +{ + pthread_t thr; + pthread_attr_t attr; + int ret = pthread_attr_init(&attr); + ret = ret ? ret : pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); + ret = ret ? ret : pthread_create(&thr, &attr, thread_routine, thread_arg); + return ret; +} + +/** + * Helper: creates detached thread and waits for it to call + * sst_sync_with_parent() */ +static void +sst_create_and_sync(const char* const role, + pthread_mutex_t* const mtx, + pthread_cond_t* const cond, + void* (*thread_routine) (void*), + void* const thread_arg) +{ + int ret = pthread_mutex_lock(mtx); + if (ret) + { + NODE_FATAL("Failed to lock %s mutex: %d (%s)", role, ret, strerror(ret)); + abort(); + } + + ret = sst_create_thread(thread_routine, thread_arg); + if (ret) + { + NODE_FATAL("Failed to create detached %s thread: %d (%s)", + role, ret, strerror(ret)); + abort(); + } + + ret = pthread_cond_wait(cond, mtx); + if (ret) + { + NODE_FATAL("Failed to synchronize with %s thread: %d (%s)", + role, ret, strerror(ret)); + abort(); + } + + pthread_mutex_unlock(mtx); +} + +/** + * Helper: syncs with parent thread and allows it to continue and return + * asynchronously */ +static void +sst_sync_with_parent(const char* role, + pthread_mutex_t* mtx, + pthread_cond_t* cond) +{ + int ret = pthread_mutex_lock(mtx); + if (ret) + { + NODE_FATAL("Failed to lock %s mutex: %d (%s)", role, ret, strerror(ret)); + abort(); + } + + NODE_INFO("Initialized %s thread", role); + + pthread_cond_signal(cond); + pthread_mutex_unlock(mtx); +} + +static pthread_mutex_t sst_joiner_mtx = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t sst_joiner_cond = PTHREAD_COND_INITIALIZER; + +struct sst_joiner_ctx +{ + struct node_ctx* node; + node_socket_t* socket; +}; + +/** + * waits for SST completion and signals the provider to continue */ +static void* +sst_joiner_thread(void* ctx) +{ + assert(ctx); + + struct node_ctx* const node = ((struct sst_joiner_ctx*)ctx)->node; + node_socket_t* const listen = ((struct sst_joiner_ctx*)ctx)->socket; + ctx = NULL; /* may be unusable after next statement */ + + /* this allows parent callback to return */ + sst_sync_with_parent("JOINER", &sst_joiner_mtx, &sst_joiner_cond); + + wsrep_gtid_t state_gtid = WSREP_GTID_UNDEFINED; + int err = -1; + + /* REPLICATION: wait for donor to connect and send the state snapshot */ + node_socket_t* const connected = node_socket_accept(listen); + if (!connected) goto end; + + uint32_t state_len; + err = node_socket_recv_bytes(connected, &state_len, sizeof(state_len)); + if (err) goto end; + + state_len = ntohl(state_len); + if (state_len > 0) + { + /* REPLICATION: get the state of state_len size */ + void* state = malloc(state_len); + if (state) + { + err = node_socket_recv_bytes(connected, state, state_len); + if (err) + { + free(state); + goto end; + } + + /* REPLICATION: install the newly received state. */ + err = node_store_init_state(node->store, state, state_len); + free(state); + if (err) goto end; + } + else + { + NODE_ERROR("Failed to allocate %zu bytes for state snapshot.", + state_len); + err = -ENOMEM; + goto end; + } + } + else + { + /* REPLICATION: it was a bypass, the node will receive missing data via + * IST. It starts with the state it currently has. */ + } + + /* REPLICATION: find gtid of the received state to report to provider */ + node_store_gtid(node->store, &state_gtid); + +end: + assert(err <= 0); + node_socket_close(connected); + node_socket_close(listen); + + /* REPLICATION: tell provider that SST is received */ + wsrep_status_t sst_ret; + wsrep_t* const wsrep = node_wsrep_provider(node->wsrep); + sst_ret = wsrep->sst_received(wsrep, &state_gtid, NULL, err); + + if (WSREP_OK != sst_ret) + { + NODE_FATAL("Failed to report completion of SST: %d", sst_ret); + abort(); + } + + return NULL; +} + +enum wsrep_cb_status +node_sst_request_cb (void* const app_ctx, + void** const sst_req, + size_t* const sst_req_len) +{ + static int const SST_PORT_OFFSET = 2; + + assert(app_ctx); + struct node_ctx* const node = app_ctx; + const struct node_options* const opts = node->opts; + + char* sst_str = NULL; + + /* REPLICATION: 1. prepare the node to receive SST */ + uint16_t const sst_port = (uint16_t)(opts->base_port + SST_PORT_OFFSET); + size_t const sst_len = strlen(opts->base_host) + + 1 /* ':' */ + 5 /* max port len */ + 1 /* \0 */; + sst_str = malloc(sst_len); + if (!sst_str) + { + NODE_ERROR("Failed to allocate %zu bytes for SST request", sst_len); + goto end; + } + + /* write in request the address at which we listen */ + int ret = snprintf(sst_str, sst_len, "%s:%hu", opts->base_host, sst_port); + if (ret < 0 || (size_t)ret >= sst_len) + { + free(sst_str); + sst_str = NULL; + NODE_ERROR("Failed to write a SST request"); + goto end; + } + + node_socket_t* const socket = node_socket_listen(NULL, sst_port); + if (!socket) + { + free(sst_str); + sst_str = NULL; + NODE_ERROR("Failed to listen at %s", sst_str); + goto end; + } + + /* REPLICATION 2. start the "joiner" thread that will wait for SST and + * report its success to provider, and syncronize with it. */ + struct sst_joiner_ctx ctx = + { + .node = node, + .socket = socket + }; + sst_create_and_sync("JOINER", &sst_joiner_mtx, &sst_joiner_cond, + sst_joiner_thread, &ctx); + + NODE_INFO("Waiting for SST at %s", sst_str); + +end: + if (sst_str) + { + *sst_req = sst_str; + *sst_req_len = strlen(sst_str) + 1; + } + else + { + *sst_req = NULL; + *sst_req_len = 0; + return WSREP_CB_FAILURE; + } + + /* REPLICATION 3. return SST request to provider */ + return WSREP_CB_SUCCESS; +} + +static pthread_mutex_t sst_donor_mtx = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t sst_donor_cond = PTHREAD_COND_INITIALIZER; + +struct sst_donor_ctx +{ + wsrep_gtid_t state; + struct node_ctx* node; + node_socket_t* socket; + wsrep_bool_t bypass; +}; + +/** + * donates SST and signals provider that it is done. */ +static void* +sst_donor_thread(void* const args) +{ + struct sst_donor_ctx const ctx = *(struct sst_donor_ctx*)args; + + int err = 0; + const void* state; + size_t state_len; + + if (ctx.bypass) + { + /* REPLICATION: if bypass is true, there is no need to send snapshot, + * just signal the joiner that snapshot is not needed and + * it can proceed to apply IST. We'll do it by sending 0 + * for the size of snapshot */ + state = NULL; + state_len = 0; + } + else + { + /* REPLICATION: if bypass is false, we need to send a full state snapshot + * Get hold of the state, which is currently just GTID + * NOTICE that while parent is waiting, the store is in a + * quiescent state, provider blocking any modifications. */ + err = node_store_acquire_state(ctx.node->store, &state, &state_len); + if (state_len > UINT32_MAX) err = -ERANGE; + } + + /* REPLICATION: after getting hold of the state we can allow parent callback + * to return and the node to resume its normal operation */ + sst_sync_with_parent("DONOR", &sst_donor_mtx, &sst_donor_cond); + + if (err >= 0) + { + uint32_t tmp = htonl((uint32_t)state_len); + err = node_socket_send_bytes(ctx.socket, &tmp, sizeof(tmp)); + } + + if (state_len != 0) + { + if (err >= 0) + { + assert(state); + err = node_socket_send_bytes(ctx.socket, state, state_len); + } + + node_store_release_state(ctx.node->store); + } + + node_socket_close(ctx.socket); + + /* REPLICATION: signal provider the success of the operation */ + wsrep_t* const wsrep = node_wsrep_provider(ctx.node->wsrep); + wsrep->sst_sent(wsrep, &ctx.state, err); + + return NULL; +} + +enum wsrep_cb_status +node_sst_donate_cb (void* const app_ctx, + void* const recv_ctx, + const wsrep_buf_t* const str_msg, + const wsrep_gtid_t* const state_id, + const wsrep_buf_t* const state, + wsrep_bool_t const bypass) +{ + (void)recv_ctx; + (void)state; + + struct sst_donor_ctx ctx = + { + .node = app_ctx, + .state = *state_id, + .bypass = bypass + }; + + /* we are expecting a human-readable 0-terminated string */ + void* p = memchr(str_msg->ptr, '\0', str_msg->len); + if (!p) + { + NODE_ERROR("Received a badly formed State Transfer Request."); + /* REPLICATION: in case of a failure we return the status to provider, so + * that the joining node can be notified of it by cluster */ + return WSREP_CB_FAILURE; + } + + const char* addr = str_msg->ptr; + ctx.socket = node_socket_connect(addr); + + if (!ctx.socket) return WSREP_CB_FAILURE; + + sst_create_and_sync("DONOR", &sst_donor_mtx, &sst_donor_cond, + sst_donor_thread, &ctx); + + return WSREP_CB_SUCCESS; +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/sst.h b/wsrep-lib/wsrep-API/v26/examples/node/sst.h new file mode 100644 index 00000000..7006a1b6 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/sst.h @@ -0,0 +1,39 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines SST interface + */ + +#ifndef NODE_SST_H +#define NODE_SST_H + +#include "../../wsrep_api.h" + +extern enum wsrep_cb_status +node_sst_request_cb (void* app_ctx, + void** sst_req, + size_t* sst_req_len); + +extern enum wsrep_cb_status +node_sst_donate_cb (void* app_ctx, + void* recv_ctx, + const wsrep_buf_t* str_msg, + const wsrep_gtid_t* state_id, + const wsrep_buf_t* state, + wsrep_bool_t bypass); + +#endif /* NODE_SST_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/stats.c b/wsrep-lib/wsrep-API/v26/examples/node/stats.c new file mode 100644 index 00000000..4b02240f --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/stats.c @@ -0,0 +1,215 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "stats.h" + +#include "log.h" + +#include <assert.h> +#include <errno.h> +#include <stdio.h> // snprintf() +#include <stdlib.h> // abort() +#include <string.h> // strcmp() +#include <unistd.h> // usleep() + +enum +{ + STATS_REPL_BYTE, + STATS_REPL_WS, + STATS_RECV_BYTE, + STATS_RECV_WS, + STATS_TOTAL_BYTE, + STATS_TOTAL_WS, + STATS_CERT_FAILS, + STATS_STORE_FAILS, + STATS_FC_PAUSED, + STATS_MAX +}; + +static const char* const stats_legend[STATS_MAX] = +{ + " repl(B/s)", + " repl(W/s)", + " recv(B/s)", + " recv(W/s)", + "total(B/s)", + "total(W/s)", + " cert.fail", + " stor.fail", + " paused(%)" +}; + +/* stats IDs in provider output - provider dependent, here we use Galera's */ +static const char* const galera_ids[STATS_MAX] = +{ + "replicated_bytes", /**< STATS_REPL_BYTE */ + "replicated", /**< STATS_REPL_WS */ + "received_bytes", /**< STATS_RECV_BYTE */ + "received", /**< STATS_RECV_WS */ + "", /**< STATS_TOTAL_BYTE */ + "", /**< STATS_TOTAL_WS */ + "local_cert_failures", /**< STATS_CERT_FAILS */ + "", /**< STATS_STORE_FAILS */ + "flow_control_paused_ns" /**< STATS_FC_PAUSED */ +}; + +/* maps local stats IDs to provider stat IDs */ +static int stats_galera_map[STATS_MAX]; + +/** + * Helper to map provider stats to own stats set */ +static void +stats_establish_mapping(wsrep_t* const wsrep) +{ + int const magic_map = -1; + size_t i; + for (i = 0; i < sizeof(stats_galera_map)/sizeof(stats_galera_map[0]); i++) + { + stats_galera_map[i] = magic_map; /* initialize map array */ + } + + struct wsrep_stats_var* const stats = wsrep->stats_get(wsrep); + + /* to compensate for STATS_TOTAL_* and STATS_STORE_FAILS having no + * counterparts */ + int mapped = 3; + + i = 0; + while (stats[i].name) /* stats array is terminated by Null name */ + { + int j; + for (j = 0; j < STATS_MAX; j++) + { + if (magic_map == stats_galera_map[j] /* j-th member still unset */ + && + !strcmp(stats[i].name, galera_ids[j])) + { + stats_galera_map[j] = (int)i; + mapped++; + if (STATS_MAX == mapped) /* all mapped */ goto out; + } + } + + i++; + } + +out: + wsrep->stats_free(wsrep, stats); +} + +static void +stats_get(node_store_t* const store, wsrep_t* const wsrep, long long stats[]) +{ + stats[STATS_STORE_FAILS] = node_store_read_view_failures(store); + + struct wsrep_stats_var* const ret = wsrep->stats_get(wsrep); + if (!ret) + { + NODE_FATAL("wsrep::stats_get() call failed."); + abort(); + } + + int i; + for (i = 0; i < STATS_MAX; i++) + { + int j = stats_galera_map[i]; + if (j >= 0) + { + assert(WSREP_VAR_INT64 == ret[j].type); + stats[i] = ret[j].value._int64; + } + } + + wsrep->stats_free(wsrep, ret); + + // totals are just sums + stats[STATS_TOTAL_BYTE] = stats[STATS_REPL_BYTE] + stats[STATS_RECV_BYTE]; + stats[STATS_TOTAL_WS ] = stats[STATS_REPL_WS ] + stats[STATS_RECV_WS ]; +} + +static void +stats_print(long long bef[], long long aft[], double period) +{ + double rate[STATS_MAX]; + int i; + for (i = 0; i < STATS_MAX; i++) + { + rate[i] = (double)(aft[i] - bef[i])/period; + } + rate[STATS_FC_PAUSED] /= 1.0e+07; // nanoseconds to % of seconds + + char str[256]; + int written = 0; + + /* first line write legend */ + for (i = 0; i < STATS_MAX; i++) + { + size_t const space_left = sizeof(str) - (size_t)written; + written += snprintf(&str[written], space_left, "%s", stats_legend[i]); + } + + str[written] = '\n'; + written++; + + /* second line write values */ + for (i = 0; i < STATS_MAX; i++) + { + size_t const space_left = sizeof(str) - (size_t)written; + long long const value = (long long)rate[i]; + written += snprintf(&str[written], space_left, " %9lld", value); + } + + str[written] = '\0'; + + /* use logging macro for timestamp */ + NODE_INFO("\n%s", str); +} + +void +node_stats_loop(const struct node_ctx* const node, int const period) +{ + double const period_sec = period; + useconds_t const period_usec = (useconds_t)period * 1000000; + + wsrep_t* const wsrep = node_wsrep_provider(node->wsrep); + stats_establish_mapping(wsrep); + + long long stats1[STATS_MAX]; + long long stats2[STATS_MAX]; + + stats_get(node->store, wsrep, stats1); + + while (1) + { + if (usleep(period_usec)) break; + stats_get(node->store, wsrep, stats2); + stats_print(stats1, stats2, period_sec); + + if (usleep(period_usec)) break; + stats_get(node->store, wsrep, stats1); + stats_print(stats2, stats1, period_sec); + } + + if (EINTR != errno) + { + NODE_ERROR("Unexpected usleep(%lld) error: %d (%s)", + (long long)period_usec, errno, strerror(errno)); + } + else + { + /* interrupted by signal */ + } +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/stats.h b/wsrep-lib/wsrep-API/v26/examples/node/stats.h new file mode 100644 index 00000000..f7ab7ef4 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/stats.h @@ -0,0 +1,35 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines performance statistics loop + */ + +#ifndef NODE_STATS_H +#define NODE_STATS_H + +#include "ctx.h" + +/** + * Prints out statistics with a given period. + * + * @param[in] node node context + * @param[in] period in seconds + */ +extern void +node_stats_loop(const struct node_ctx* node, int period); + +#endif /* NODE_STATS_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/store.c b/wsrep-lib/wsrep-API/v26/examples/node/store.c new file mode 100644 index 00000000..1dc2d6c1 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/store.c @@ -0,0 +1,1044 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "store.h" + +#include "log.h" + +#include <assert.h> +#include <errno.h> +#include <pthread.h> +#include <stdbool.h> +#include <stddef.h> // ptrdiff_t +#include <stdint.h> // uintptr_t +#include <stdlib.h> // abort() +#include <string.h> // memset() + +#define DECLARE_SERIALIZE_INT(INTTYPE) \ + static inline size_t \ + store_serialize_##INTTYPE(void* const to, INTTYPE##_t const from) \ + { \ + memcpy(to, &from, sizeof(from)); /* for simplicity ignore endianness */ \ + return sizeof(from); \ + } + +DECLARE_SERIALIZE_INT(uint32); +DECLARE_SERIALIZE_INT(int64); + +#define DECLARE_DESERIALIZE_INT(INTTYPE) \ + static inline size_t \ + store_deserialize_##INTTYPE(INTTYPE##_t* const to, const void* const from) \ + { \ + memcpy(to, from, sizeof(*to)); /* for simplicity ignore endianness */ \ + return sizeof(*to); \ + } + +DECLARE_DESERIALIZE_INT(uint32); +DECLARE_DESERIALIZE_INT(int64); + +typedef struct record +{ + wsrep_seqno_t version; + uint32_t value; + /* this order ensures that there is no padding between the members */ +} +record_t; + +#define STORE_RECORD_SIZE \ + (sizeof(((record_t*)(NULL))->version) + sizeof(((record_t*)(NULL))->value)) + +static inline size_t +store_record_set(void* const base, + size_t const index, + const record_t* const record) +{ + char* const position = (char*)base + index*STORE_RECORD_SIZE; + memcpy(position, record, STORE_RECORD_SIZE); + return STORE_RECORD_SIZE; +} + +static inline size_t +store_record_get(const void* const base, + size_t const index, + record_t* const record) +{ + const char* const position = (const char*)base + index*STORE_RECORD_SIZE; + memcpy(record, position, STORE_RECORD_SIZE); + return STORE_RECORD_SIZE; +} + +static inline bool +store_record_equal(const record_t* const lhs, const record_t* const rhs) +{ + return (lhs->version == rhs->version) && (lhs->value == rhs->value); +} + +/* transaction context */ +struct store_trx_op +{ + /* Normally what we'd need for transaction context is the record index and + * new record value. Here we also save read view snapshot (rec_from & rec_to) + * to + * 1. test provider certification correctness if provider supports read view + * 2. if not, detect conflicts at a store level. */ + record_t rec_from; + record_t rec_to; + uint32_t idx_from; + uint32_t idx_to; + uint32_t new_value; + uint32_t size; /* nominal "size" of operation to manipulate on-the-wire + * writeset size. */ +}; + +#define STORE_OP_SIZE (STORE_RECORD_SIZE + STORE_RECORD_SIZE + \ + sizeof(((struct store_trx_op*)NULL)->idx_from) + \ + sizeof(((struct store_trx_op*)NULL)->idx_to) + \ + sizeof(((struct store_trx_op*)NULL)->new_value) + \ + sizeof(((struct store_trx_op*)NULL)->size)) + +struct store_trx_ctx +{ + wsrep_gtid_t rv_gtid; + size_t ops_num; + struct store_trx_op* ops; +}; + +static inline bool +store_trx_add_op(struct store_trx_ctx* const trx) +{ + struct store_trx_op* const new_ops = + realloc(trx->ops, sizeof(struct store_trx_op)*(trx->ops_num + 1)); + + if (new_ops) + { + trx->ops = new_ops; +#ifndef NDEBUG + memset(&trx->ops[trx->ops_num], 0, sizeof(*trx->ops)); +#endif + trx->ops_num++; + } + + return (NULL == new_ops); +} + +struct store_trx_entry +{ + bool used; + struct store_trx_ctx ctx; +}; + +typedef wsrep_uuid_t member_t; + +struct node_store +{ + wsrep_gtid_t gtid; + pthread_mutex_t gtid_mtx; + wsrep_trx_id_t trx_id; + pthread_mutex_t trx_id_mtx; + char* snapshot; + member_t* members; + void* records; + size_t op_size; + long read_view_fails; + uint32_t members_num; + uint32_t records_num; + uint32_t entries_mask; + bool read_view_support; // read view support by cluster + /* trx pool piggybacked */ +}; + +node_store_t* +node_store_open(const struct node_options* const opts) +{ + /* make the size of trx pool the next highest power of 2 over the total + * number of workers */ + uint32_t trx_pool_mask = (uint32_t)(opts->masters + opts->slaves); + if (trx_pool_mask > 0) + { + trx_pool_mask -= 1; + trx_pool_mask |= trx_pool_mask >> 1; + trx_pool_mask |= trx_pool_mask >> 2; + trx_pool_mask |= trx_pool_mask >> 4; + trx_pool_mask |= trx_pool_mask >> 8; + trx_pool_mask |= trx_pool_mask >> 16; + } + assert(((trx_pool_mask + 1) & trx_pool_mask) == 0); // 2^n - 1 + + size_t const desired_op_size = (size_t)(opts->ws_size/opts->operations); + size_t const op_size = (desired_op_size > STORE_OP_SIZE ? + desired_op_size : STORE_OP_SIZE); + + /* since the number of workers will never change, we can allocate trx pool + * together with the main store struc */ + size_t const store_alloc_size = sizeof(struct node_store) + + /* op_size - additional buffer for op serialization per trx */ + (sizeof(struct store_trx_entry) + op_size)*(trx_pool_mask + 1); + + struct node_store* const ret = malloc(store_alloc_size); + + if (ret) + { + memset(ret, 0, store_alloc_size); + ret->records = malloc((size_t)opts->records * STORE_RECORD_SIZE); + + if (ret->records) + { + ret->gtid = WSREP_GTID_UNDEFINED; + pthread_mutex_init(&ret->gtid_mtx, NULL); + pthread_mutex_init(&ret->trx_id_mtx, NULL); + ret->op_size = op_size; + ret->records_num = (uint32_t)opts->records; + ret->entries_mask = trx_pool_mask; + + uint32_t i; + for (i = 0; i < ret->records_num; i++) + { + /* keep state in serialized form for easy snapshotting */ + struct record const record = { WSREP_SEQNO_UNDEFINED, i }; + store_record_set(ret->records, i, &record); + } + + return ret; + } + else + { + free(ret); + } + } + + return NULL; +} + +void +node_store_close(struct node_store* const store) +{ + assert(store); + assert(store->records); + pthread_mutex_destroy(&store->gtid_mtx); + pthread_mutex_destroy(&store->trx_id_mtx); + free(store->records); + free(store->members); + free(store); +} + +#define STORE_MUTEX_LOCK(mtx) \ + { \ + int err = pthread_mutex_lock(mtx); \ + if (err) \ + { \ + NODE_FATAL("Failed to lock " #mtx ": %d (%s)", \ + err, strerror(err)); \ + abort(); \ + } \ + } + +static inline struct store_trx_entry* +store_get_trx_entry(struct node_store* const store, wsrep_trx_id_t const trx_id) +{ + return (struct store_trx_entry*) + ((char*)(store + 1) + (trx_id & store->entries_mask)* + (sizeof(struct store_trx_entry) + store->op_size)); +} + +static inline struct store_trx_ctx* +store_get_trx_ctx(struct node_store* const store, wsrep_trx_id_t const trx_id) +{ + return &(store_get_trx_entry(store, trx_id)->ctx); +} + +static inline wsrep_trx_id_t +store_new_trx_id(struct node_store* const store) +{ + wsrep_trx_id_t ret; + struct store_trx_entry* trx; + + STORE_MUTEX_LOCK(&store->trx_id_mtx); + + do + { + store->trx_id++; + trx = store_get_trx_entry(store, store->trx_id); + } + while (trx->used); + trx->used = true; + ret = store->trx_id; + + pthread_mutex_unlock(&store->trx_id_mtx); + + memset(&trx->ctx, 0, sizeof(trx->ctx)); + + return ret; +} + +static inline void +store_free_trx_id(struct node_store* const store, wsrep_trx_id_t const trx_id) +{ + struct store_trx_entry* const trx = store_get_trx_entry(store, trx_id); + assert(trx->used); + free(trx->ctx.ops); + + STORE_MUTEX_LOCK(&store->trx_id_mtx); + + trx->used = false; + + pthread_mutex_unlock(&store->trx_id_mtx); +} + +/** + * deserializes membership from snapshot */ +static int +store_new_members(const char* ptr, const char* const endptr, + uint32_t* const num, member_t** const memb) +{ + ptr += store_deserialize_uint32(num, ptr); + + if (*num < 2) + { + NODE_ERROR("Bogus number of members %u", *num); + return -1; + } + + int ret = (int)sizeof(*num); + + size_t const msize = sizeof(member_t) * *num; + if ((endptr - ptr) < (ptrdiff_t)msize) + { + NODE_ERROR("State snapshot does not contain all membership: " + "%zd < %zu", endptr - ptr, msize); + return -1; + } + + *memb = calloc(*num, sizeof(member_t)); + if (!*memb) + { + NODE_ERROR("Could not allocate new membership"); + return -ENOMEM; + } + + memcpy(*memb, ptr, msize); + + return ret + (int)msize; +} + +/** + * deserializes records from snapshot */ +static int +store_new_records(const char* ptr, const char* const endptr, + uint32_t* const num, void** const rec) +{ + ptr += store_deserialize_uint32(num, ptr); + + int ret = (int)sizeof(*num); + if (!*num) + { + *rec = NULL; + return ret; + } + + size_t const rsize = STORE_RECORD_SIZE * *num; + if ((endptr - ptr) < (ptrdiff_t)rsize) + { + NODE_ERROR("State snapshot does not contain all records: " + "%zu < %zu", endptr - ptr, rsize); + return -1; + } + + *rec = malloc(rsize); + if (!*rec) + { + NODE_ERROR("Could not allocate new records"); + return -ENOMEM; + } + + memcpy(*rec, ptr, rsize); + + return ret + (int)rsize; +} + +int +node_store_init_state(struct node_store* const store, + const void* const state, + size_t const state_len) +{ + /* First, deserialize and prepare new state */ + if (state_len <= sizeof(member_t)*2 /* at least two members */ + + WSREP_UUID_STR_LEN + 1 /* : */ + 1 /* seqno */ + 1 /* \0 */) + { + NODE_ERROR("State snapshot too short: %zu", state_len); + return -1; + } + + wsrep_gtid_t state_gtid; + int ret; + ret = wsrep_gtid_scan(state, state_len, &state_gtid); + if (ret < 0) + { + char state_str[WSREP_GTID_STR_LEN + 1] = { 0, }; + memcpy(state_str, state, sizeof(state_str) - 1); + NODE_ERROR("Could not find valid GTID in the received data: %s", + state_str); + return -1; + } + + ret++; /* \0 */ + if ((state_len - (size_t)ret) < sizeof(uint32_t)) + { + NODE_ERROR("State snapshot does not contain the number of members"); + return -1; + } + + const char* ptr = ((char*)state); + const char* const endptr = ptr + state_len; + ptr += ret; + + uint32_t m_num; + member_t* new_members; + ret = store_new_members(ptr, endptr, &m_num, &new_members); + if (ret < 0) + { + return ret; + } + ptr += ret; + + bool const read_view_support = ptr[0]; + ptr += 1; + + uint32_t r_num; + void* new_records; + ret = store_new_records(ptr, endptr, &r_num, &new_records); + if (ret < 0) + { + free(new_members); + return ret; + } + ptr += ret; + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + /* just a sanity check */ + if (0 == wsrep_uuid_compare(&state_gtid.uuid, &store->gtid.uuid) && + state_gtid.seqno < store->gtid.seqno) + { + NODE_ERROR("Received snapshot that is in the past: my seqno %lld," + " received seqno: %lld", + (long long)store->gtid.seqno, (long long)state_gtid.seqno); + free(new_members); + free(new_records); + ret = -1; + } + else + { + free(store->members); + store->members_num = m_num; + store->members = new_members; + free(store->records); + store->records_num = r_num; + store->records = new_records; + store->gtid = state_gtid; + store->read_view_support = read_view_support; + ret = 0; + } + + pthread_mutex_unlock(&store->gtid_mtx); + + return ret; +} + +int +node_store_acquire_state(node_store_t* const store, + const void** const state, + size_t* const state_len) +{ + int ret = 0; + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + if (!store->snapshot) + { + size_t const memb_len = store->members_num * sizeof(member_t); + size_t const rec_len = store->records_num * STORE_RECORD_SIZE; + size_t const buf_len = WSREP_GTID_STR_LEN + 1 + + sizeof(uint32_t) + memb_len + + 1 /* read view support */ + + sizeof(uint32_t) + rec_len; + + store->snapshot = malloc(buf_len); + + if (store->snapshot) + { + char* ptr = store->snapshot; + + /* state GTID */ + ret = wsrep_gtid_print(&store->gtid, ptr, buf_len); + if (ret > 0) + { + NODE_INFO(""); + assert((size_t)ret < buf_len); + + ptr[ret] = '\0'; + ret++; + ptr += ret; + assert((size_t)ret < buf_len); + + /* membership */ + ptr += store_serialize_uint32(ptr, store->members_num); + ret += (int)sizeof(uint32_t); + assert((size_t)ret + memb_len < buf_len); + memcpy(ptr, store->members, memb_len); + ptr += memb_len; + ret += (int)memb_len; + assert((size_t)ret + sizeof(uint32_t) <= buf_len); + + /* read view support */ + ptr[0] = store->read_view_support; + ptr += 1; + ret += 1; + + /* records */ + ptr += store_serialize_uint32(ptr, store->records_num); + ret += (int)sizeof(uint32_t); + assert((size_t)ret + rec_len < buf_len); + memcpy(ptr, store->records, rec_len); + ret += (int)rec_len; + assert((size_t)ret <= buf_len); + } + else + { + NODE_ERROR("Failed to record GTID: %d (%s)", ret,strerror(-ret)); + free(store->snapshot); + store->snapshot = 0; + } + } + else + { + NODE_ERROR("Failed to allocate snapshot buffer of size %zu",buf_len); + ret = -ENOMEM; + } + } + else + { + assert(0); /* provider should prevent such situation */ + ret = -EAGAIN; + } + + pthread_mutex_unlock(&store->gtid_mtx); + + if (ret > 0) + { + NODE_INFO("\n\nPrepared snapshot of %u records\n\n", store->records_num); + *state = store->snapshot; + *state_len = (size_t)ret; + ret = 0; + } + + return ret; +} + +void +node_store_release_state(node_store_t* const store) +{ + STORE_MUTEX_LOCK(&store->gtid_mtx); + + assert(store->snapshot); + free(store->snapshot); + store->snapshot = 0; + + pthread_mutex_unlock(&store->gtid_mtx); +} + +int +node_store_update_membership(struct node_store* const store, + const wsrep_view_info_t* const v) +{ + assert(store); + assert(WSREP_VIEW_PRIMARY == v->status); + assert(v->memb_num > 0); + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + bool const continuation = v->state_id.seqno == store->gtid.seqno + 1 && + 0 == wsrep_uuid_compare(&v->state_id.uuid, &store->gtid.uuid); + + bool const initialization = WSREP_SEQNO_UNDEFINED == store->gtid.seqno && + 0 == wsrep_uuid_compare(&WSREP_UUID_UNDEFINED, &store->gtid.uuid); + + if (!(continuation || initialization)) + { + char store_str[WSREP_GTID_STR_LEN + 1] = { 0, }; + wsrep_gtid_print(&store->gtid, store_str, sizeof(store_str)); + char view_str[WSREP_GTID_STR_LEN + 1] = { 0, }; + wsrep_gtid_print(&v->state_id, view_str, sizeof(view_str)); + + NODE_FATAL("Attempt to initialize store GTID from incompatible view:\n" + "\tstore: %s\n" + "\tview: %s", + store_str, view_str); + abort(); + } + + wsrep_uuid_t* const new_members = calloc(sizeof(wsrep_uuid_t), + (size_t)v->memb_num); + if (!new_members) + { + NODE_FATAL("Could not allocate new members array"); + abort(); + } + + int i; + for (i = 0; i < v->memb_num; i++) + { + new_members[i] = v->members[i].id; + } + + /* REPLICATION: at this point we should compare old and new memberships and + * rollback all streaming transactions from the partitioned + * members, if any. But we don't support it in this program yet. + */ + + free(store->members); + + store->members = new_members; + store->members_num = (uint32_t)v->memb_num; + store->gtid = v->state_id; + store->read_view_support = (v->capabilities & WSREP_CAP_SNAPSHOT); + + pthread_mutex_unlock(&store->gtid_mtx); + + return 0; +} + +void +node_store_gtid(struct node_store* const store, + wsrep_gtid_t* const gtid) +{ + assert(store); + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + *gtid = store->gtid; + + pthread_mutex_unlock(&store->gtid_mtx); +} + + +static inline void +store_serialize_op(void* const buf, const struct store_trx_op* const op) +{ + char* ptr = buf; + ptr += store_record_set(ptr, 0, &op->rec_from); + ptr += store_record_set(ptr, 0, &op->rec_to); + ptr += store_serialize_uint32(ptr, op->idx_from); + ptr += store_serialize_uint32(ptr, op->idx_to); + ptr += store_serialize_uint32(ptr, op->new_value); + store_serialize_uint32(ptr, op->size); +} + +static inline void +store_deserialize_op(struct store_trx_op* const op, const void* const buf) +{ + const char* ptr = buf; + ptr += store_record_get(ptr, 0, &op->rec_from); + ptr += store_record_get(ptr, 0, &op->rec_to); + ptr += store_deserialize_uint32(&op->idx_from, ptr); + ptr += store_deserialize_uint32(&op->idx_to, ptr); + ptr += store_deserialize_uint32(&op->new_value, ptr); + store_deserialize_uint32(&op->size, ptr); +} + +static inline void +store_serialize_gtid(void* const buf, const wsrep_gtid_t* const gtid) +{ + char* ptr = buf; + memcpy(ptr, >id->uuid, sizeof(gtid->uuid)); + ptr += sizeof(gtid->uuid); + store_serialize_int64(ptr, gtid->seqno); +} + +static inline void +store_deserialize_gtid(wsrep_gtid_t* const gtid, const void* const buf) +{ + const char* ptr = buf; + memcpy(>id->uuid, ptr, sizeof(gtid->uuid)); + ptr += sizeof(gtid->uuid); + store_deserialize_int64(>id->seqno, ptr); +} + +#define STORE_GTID_SIZE (sizeof(((wsrep_gtid_t*)(NULL))->uuid) + sizeof(int64_t)) + +int +node_store_execute(node_store_t* const store, + wsrep_t* const wsrep, + wsrep_ws_handle_t* const ws_handle) +{ + assert(store); + + if (0 == ws_handle->trx_id) + { + assert(sizeof(ws_handle->trx_id) >= sizeof(uintptr_t)); + ws_handle->trx_id = store_new_trx_id(store); + } + + struct store_trx_ctx* trx = store_get_trx_ctx(store, ws_handle->trx_id); + if (store_trx_add_op(trx)) return -ENOMEM; + struct store_trx_op* const op = &trx->ops[trx->ops_num - 1]; + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + if (1 == trx->ops_num) + { + /* First operation, save ID of the read view of the transaction */ + trx->rv_gtid = store->gtid; + } + + /* Transaction op: copy value from one random record to another... */ + op->idx_from = (uint32_t)rand() % store->records_num; + op->idx_to = (uint32_t)rand() % store->records_num; + store_record_get(store->records, op->idx_from, &op->rec_from); + store_record_get(store->records, op->idx_to, &op->rec_to); + + pthread_mutex_unlock(&store->gtid_mtx); + + wsrep_status_t ret = WSREP_TRX_FAIL; + + if (op->rec_from.version > trx->rv_gtid.seqno || + op->rec_to.version > trx->rv_gtid.seqno) + { + /* transaction read view changed, trx needs to be restarted */ +#if 0 + NODE_INFO("Transaction read view changed: %lld -> %lld, returning %d", + (long long)trx->rv_gtid.seqno, + (long long)(op->rec_from.version > op->rec_to.version ? + op->rec_from.version : op->rec_to.version), + ret); +#endif + goto error; + } + + /* Transaction op: ... and modify it somehow, e.g. increment by 1 */ + op->new_value = op->rec_from.value + 1; + + if (1 == trx->ops_num) // first trx operation + { + /* REPLICATION: Since this application does not implement record locks, + * it needs to establish read view for each transaction for + * a proper conflict detection and transaction isolation. + * Otherwose we'll need to implement record versioning */ + if (store->read_view_support) + { + ret = wsrep->assign_read_view(wsrep, ws_handle, &trx->rv_gtid); + if (ret) + { + NODE_ERROR("wsrep::assign_read_view(%lld) failed: %d", + trx->rv_gtid.seqno, ret); + goto error; + } + } + + /* Record read view in the writeset for debugging purposes */ + assert(store->op_size > STORE_GTID_SIZE); + store_serialize_gtid(trx + 1, &trx->rv_gtid); + wsrep_buf_t ws = { .ptr = trx + 1, .len = STORE_GTID_SIZE }; + ret = wsrep->append_data(wsrep, ws_handle, &ws, 1, WSREP_DATA_ORDERED, + true); + if (ret) + { + NODE_ERROR("wsrep::append_data(rv_gtid) failed: %d", ret); + goto error; + } + } + + /* REPLICATION: append keys touched by the operation + * + * NOTE: depending on data access granularity some applications may require + * multipart keys, e.g. <schema>:<table>:<row> in a SQL database. + * Single part keys match hashtables and key-value stores. + * Below we have two different single-part keys which reference two + * different records. */ + uint32_t key_val; + wsrep_buf_t key_part = { .ptr = &key_val, .len = sizeof(key_val) }; + wsrep_key_t ws_key = { .key_parts = &key_part, .key_parts_num = 1 }; + + /* REPLICATION: Key 1 - the key of the source, unchanged record */ + store_serialize_uint32(&key_val, op->idx_from); + ret = wsrep->append_key(wsrep, ws_handle, + &ws_key, + 1, /* single key */ + WSREP_KEY_REFERENCE, + true /* provider shall make a copy of the key */); + if (ret) + { + NODE_ERROR("wsrep::append_key(REFERENCE) failed: %d", ret); + goto error; + } + + /* REPLICATION: Key 2 - the key of the record we want to update */ + store_serialize_uint32(&key_val, op->idx_to); + ret = wsrep->append_key(wsrep, ws_handle, + &ws_key, + 1, /* single key */ + WSREP_KEY_UPDATE, + true /* provider shall make a copy of the key */); + if (ret) + { + NODE_ERROR("wsrep::append_key(UPDATE) failed: %d", ret); + goto error; + } + + /* REPLICATION: append transaction operation to the "writeset" + * (WS buffer was allocated together with trx context above) */ + assert(store->op_size >= STORE_OP_SIZE); + assert(store->op_size == (uint32_t)store->op_size); + op->size = (uint32_t)store->op_size; + store_serialize_op(trx + 1, op); + wsrep_buf_t ws = { .ptr = trx + 1, .len = store->op_size }; + ret = wsrep->append_data(wsrep, ws_handle, &ws, 1, WSREP_DATA_ORDERED, true); + + if (!ret) return 0; + + NODE_ERROR("wsrep::append_data(op) failed: %d", ret); + +error: + store_free_trx_id(store, ws_handle->trx_id); + + return ret; +} + +int +node_store_apply(node_store_t* const store, + wsrep_trx_id_t* const trx_id, + const wsrep_buf_t* const ws) +{ + assert(store); + (void)store; + + *trx_id = store_new_trx_id(store); + struct store_trx_ctx* const trx = store_get_trx_ctx(store, *trx_id); + + /* prepare trx context for commit */ + const char* ptr = ws->ptr; + size_t left = ws->len; + + /* at least one operation should be there */ + assert(left >= STORE_GTID_SIZE + STORE_OP_SIZE); + + if (left >= STORE_GTID_SIZE) + { + store_deserialize_gtid(&trx->rv_gtid, ptr); + left -= STORE_GTID_SIZE; + ptr += STORE_GTID_SIZE; + } + + while (left >= STORE_OP_SIZE) + { + if (store_trx_add_op(trx)) + { + store_free_trx_id(store,*trx_id); /* "rollback": release resources */ + return -ENOMEM; + } + struct store_trx_op* const op = &trx->ops[trx->ops_num - 1]; + + store_deserialize_op(op, ptr); + assert(op->idx_to <= store->records_num); + + left -= op->size; + ptr += op->size; + } + + if (left != 0) + { + NODE_FATAL("Failed to process last (%d/%zu) bytes of the writeset.", + (int)left, ws->len); + abort(); + } + + return 0; +} + +static uint32_t const store_fnv32_seed = 2166136261; + +static inline uint32_t +store_fnv32a(const void* buf, size_t const len, uint32_t seed) +{ + static uint32_t const fnv32_prime = 16777619; + const uint8_t* bp = (const uint8_t*)buf; + const uint8_t* const be = bp + len; + + while (bp < be) + { + seed ^= *bp++; + seed *= fnv32_prime; + } + + return seed; +} + + +static void +store_checksum_state(node_store_t* store) +{ + uint32_t res = store_fnv32_seed; + uint32_t i; + + for (i = 0; i < store->members_num; i++) + { + res = store_fnv32a(&store->members[i], sizeof(*store->members), res); + } + + res = store_fnv32a(store->records, store->records_num * STORE_RECORD_SIZE, + res); + + res = store_fnv32a(&store->gtid.uuid, sizeof(store->gtid.uuid), res); + + wsrep_seqno_t s; + store_serialize_int64(&s, store->gtid.seqno); + res = store_fnv32a(&s, sizeof(s), res); + + NODE_INFO("\n\n\tSeqno: %lld; state hash: %#010x\n", + (long long)store->gtid.seqno, res); +} + +static inline void +store_update_gtid(node_store_t* const store, const wsrep_gtid_t* ws_gtid) +{ + assert(0 == wsrep_uuid_compare(&store->gtid.uuid, &ws_gtid->uuid)); + + store->gtid.seqno++; + + if (store->gtid.seqno != ws_gtid->seqno) + { + NODE_FATAL("Out of order commit: expected %lld, got %lld", + store->gtid.seqno, ws_gtid->seqno); + abort(); + } + + static wsrep_seqno_t const period = 0x000fffff; /* ~1M */ + if (0 == (store->gtid.seqno & period)) + { + store_checksum_state(store); + } +} + +void +node_store_commit(node_store_t* const store, + wsrep_trx_id_t const trx_id, + const wsrep_gtid_t* const ws_gtid) +{ + assert(store); + assert(trx_id); + + struct store_trx_ctx* const trx = store_get_trx_ctx(store, trx_id); + + bool const check_read_view_snapshot = +#ifdef NDEBUG + !store->read_view_support; +#else + 1; +#endif /* NDEBUG */ + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + store_update_gtid(store, ws_gtid); + + /* First loop is to check if we can commit all operations if provider + * does not support read view or for debugging puposes */ + size_t i; + if (check_read_view_snapshot) + { + for (i = 0; i < trx->ops_num; i++) + { + struct store_trx_op* const op = &trx->ops[i]; + + record_t from, to; + store_record_get(store->records, op->idx_from, &from); + store_record_get(store->records, op->idx_to, &to); + + if (!store_record_equal(&op->rec_from, &from) || + !store_record_equal(&op->rec_to, &to)) + { + /* read view changed since transaction was executed, + * can't commit */ + assert(op->rec_from.version <= from.version); + assert(op->rec_to.version <= to.version); + if (op->rec_from.version == from.version) + assert(op->rec_from.value == from.value); + if (op->rec_to.version == to.version) + assert(op->rec_to.value == to.value); + if (store->read_view_support) abort(); + + store->read_view_fails++; + + NODE_INFO("Read view changed at commit time, rollback trx"); + + goto error; + } + } + } + + /* Second loop is to actually modify the dataset */ + for (i = 0; i < trx->ops_num; i++) + { + struct store_trx_op* const op = &trx->ops[i]; + + record_t const new_record = + { .version = ws_gtid->seqno, .value = op->new_value }; + + store_record_set(store->records, op->idx_to, &new_record); + } + +error: + pthread_mutex_unlock(&store->gtid_mtx); + + store_free_trx_id(store, trx_id); +} + +void +node_store_rollback(node_store_t* const store, + wsrep_trx_id_t const trx_id) +{ + assert(store); + assert(trx_id); + + store_free_trx_id(store, trx_id); +} + +void +node_store_update_gtid(node_store_t* const store, + const wsrep_gtid_t* const ws_gtid) +{ + assert(store); + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + store_update_gtid(store, ws_gtid); + + pthread_mutex_unlock(&store->gtid_mtx); +} + +long +node_store_read_view_failures(node_store_t* const store) +{ + assert(store); + + long ret; + + STORE_MUTEX_LOCK(&store->gtid_mtx); + + ret = store->read_view_fails;; + + pthread_mutex_unlock(&store->gtid_mtx); + + return ret; +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/store.h b/wsrep-lib/wsrep-API/v26/examples/node/store.h new file mode 100644 index 00000000..51da74d7 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/store.h @@ -0,0 +1,125 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines simple "transactional storage engine" interface + */ + +#ifndef NODE_STORE_H +#define NODE_STORE_H + +#include "options.h" + +#include "../../wsrep_api.h" + +typedef struct node_store node_store_t; + +/** + * open a store and optionally assocoate a file with it */ +extern node_store_t* +node_store_open(const struct node_options* opts); + +/** + * close store and deallocate associated resources */ +extern void +node_store_close(node_store_t* store); + +/** + * initialize store with a state */ +extern int +node_store_init_state(node_store_t* store, const void* state, size_t state_len); + +/** + * Return a pointer to state snapshot that is guaranteed to be unchanged + * until node_store_release_state() is called. + * + * @param[out] state pointer to state snapshot + * @param[out] state_len soze of state snapshot + */ +extern int +node_store_acquire_state(node_store_t* store, + const void** state, size_t* state_len); + +/** + * release state */ +extern void +node_store_release_state(node_store_t* store); + +/** + * inform store about new membership */ +extern int +node_store_update_membership(node_store_t* store, const wsrep_view_info_t* v); + +/** + * get the current GTID (last committed) */ +extern void +node_store_gtid(node_store_t* store, wsrep_gtid_t* gtid); + +/** + * execute and prepare local transaction in store and return its key and write + * set. + * + * This operation allocates resources that must be freed with either + * node_store_commit() or node_store_rollback() + * + * @param[in] wsrep provider handle + * @param[out] ws_handle reference to the resulting write set in the provider + */ +extern int +node_store_execute(node_store_t* store, + wsrep_t* wsrep, + wsrep_ws_handle_t* ws_handle); + +/** + * apply and prepare foreign write set received from replication + * + * This operation allocates resources that must be freed with either + * node_store_commit() or node_store_rollback() + * + * @param[out] trx_id locally unique transaction ID + * @param[in] ws foreign transaction write set + */ +extern int +node_store_apply(node_store_t* store, + wsrep_trx_id_t* trx_id, + const wsrep_buf_t* ws); + +/** + * commit prepared transaction identified by trx_id */ +extern void +node_store_commit(node_store_t* store, + wsrep_trx_id_t trx_id, + const wsrep_gtid_t* ws_gtid); + +/** + * rollback prepared transaction identified by trx_id */ +extern void +node_store_rollback(node_store_t* store, + wsrep_trx_id_t trx_id); + +/** + * update storage GTID for transactions that had to be skipped/rolled back */ +extern void +node_store_update_gtid(node_store_t* store, + const wsrep_gtid_t* ws_gtid); + +/** + * @return the number of store read view snapshot check failures at commit time. + * (should be zero if provider implements assign_read_view() call) */ +extern long +node_store_read_view_failures(node_store_t* store); + +#endif /* NODE_STORE_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/trx.c b/wsrep-lib/wsrep-API/v26/examples/node/trx.c new file mode 100644 index 00000000..afdcada4 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/trx.c @@ -0,0 +1,155 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "trx.h" +#include "log.h" + +#include <assert.h> +#include <errno.h> // ENOMEM, etc. +#include <stdbool.h> + +wsrep_status_t +node_trx_execute(node_store_t* const store, + wsrep_t* const wsrep, + wsrep_conn_id_t const conn_id, + int ops_num) +{ + wsrep_status_t cert = WSREP_OK; // for cleanup + + static unsigned int const ws_flags = + WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END; // atomic trx + wsrep_trx_meta_t ws_meta; + wsrep_status_t ret = WSREP_OK; + + /* prepare simple transaction and obtain a writeset handle for it */ + wsrep_ws_handle_t ws_handle = { 0, NULL }; + while (ops_num--) + { + if (0 != (ret = node_store_execute(store, wsrep, &ws_handle))) + { +#if 0 + NODE_INFO("master [%d]: node_store_execute() returned %d", + conn_id, ret); +#endif + ret = WSREP_TRX_FAIL; + goto cleanup; + } + } + + /* REPLICATION: (replicate and) certify the writeset (pointed to by + * ws_handle) with the cluster */ + cert = wsrep->certify(wsrep, conn_id, &ws_handle, ws_flags, &ws_meta); + + if (WSREP_BF_ABORT == cert) + { + /* REPLICATION: transaction was signaled to abort due to multi-master + * conflict. It must rollback immediately: it blocks + * transaction that was ordered earlier and will never + * be able to enter commit order. */ + node_store_rollback(store, ws_handle.trx_id); + } + + /* REPLICATION: writeset was totally ordered, need to enter commit order */ + if (ws_meta.gtid.seqno > 0) + { + ret = wsrep->commit_order_enter(wsrep, &ws_handle, &ws_meta); + if (ret) + { + NODE_ERROR("master [%d]: wsrep::commit_order_enter(%lld) failed: " + "%d", (long long)(ws_meta.gtid.seqno), ret); + goto cleanup; + } + + /* REPLICATION: inside commit monitor + * Note: we commit transaction only if certification succeded */ + if (WSREP_OK == cert) + node_store_commit(store, ws_handle.trx_id, &ws_meta.gtid); + else + node_store_update_gtid(store, &ws_meta.gtid); + + ret = wsrep->commit_order_leave(wsrep, &ws_handle, &ws_meta, NULL); + if (ret) + { + NODE_ERROR("master [%d]: wsrep::commit_order_leave(%lld) failed: " + "%d", (long long)(ws_meta.gtid.seqno), ret); + goto cleanup; + } + } + else + { + assert(cert); + } + +cleanup: + /* REPLICATION: if wsrep->certify() returned anything else but WSREP_OK + * transaction must roll back. BF aborted trx already did it. */ + if (cert && WSREP_BF_ABORT != cert) + node_store_rollback(store, ws_handle.trx_id); + + /* NOTE: this application follows the approach that resources must be freed + * at the same level where they were allocated, so it is assumed that + * ws_key and ws were deallocated in either commit or rollback calls.*/ + + /* REPLICATION: release provider resources associated with the trx */ + wsrep->release(wsrep, &ws_handle); + + return ret ? ret : cert; +} + +wsrep_status_t +node_trx_apply(node_store_t* const store, + wsrep_t* const wsrep, + const wsrep_ws_handle_t* const ws_handle, + const wsrep_trx_meta_t* const ws_meta, + const wsrep_buf_t* const ws) +{ + /* no business being here if event was not ordered */ + assert(ws_meta->gtid.seqno > 0); + + wsrep_trx_id_t trx_id; + wsrep_buf_t err_buf = { NULL, 0 }; + int app_err; + if (ws) + { + app_err = node_store_apply(store, &trx_id, ws); + if (app_err) + { + /* REPLICATION: if applying failed, prepare an error buffer with + * sufficient error specification */ + err_buf.ptr = &app_err; // suppose error code is enough + err_buf.len = sizeof(app_err); + } + } + else /* ws failed certification and should be skipped */ + { + /* just some non-0 code to choose node_store_update_gtid() below */ + app_err = 1; + } + + wsrep_status_t ret; + ret = wsrep->commit_order_enter(wsrep, ws_handle, ws_meta); + if (ret) { + node_store_rollback(store, trx_id); + return ret; + } + + if (!app_err) node_store_commit(store, trx_id, &ws_meta->gtid); + else node_store_update_gtid(store, &ws_meta->gtid); + + ret = wsrep->commit_order_leave(wsrep, ws_handle, ws_meta, &err_buf); + + return ret; +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/trx.h b/wsrep-lib/wsrep-API/v26/examples/node/trx.h new file mode 100644 index 00000000..e1d763a1 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/trx.h @@ -0,0 +1,50 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines "transaction" interface + */ + +#ifndef NODE_TRX_H +#define NODE_TRX_H + +#include "store.h" + +#include "../../wsrep_api.h" + +/** + * executes and replicates local transaction + */ +extern wsrep_status_t +node_trx_execute(node_store_t* store, + wsrep_t* wsrep, + wsrep_conn_id_t conn_id, + int ops_num); + +/** + * applies and commits slave write set + * + * @param ws replicated event writeset. NULL if it failed certification (and so + * must be skipped, but it was ordered, so store GTID must be updated) + */ +extern wsrep_status_t +node_trx_apply(node_store_t* store, + wsrep_t* wsrep, + const wsrep_ws_handle_t* ws_handle, + const wsrep_trx_meta_t* ws_meta, + const wsrep_buf_t* ws); + +#endif /* NODE_TRX_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/worker.c b/wsrep-lib/wsrep-API/v26/examples/node/worker.c new file mode 100644 index 00000000..e9901ad8 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/worker.c @@ -0,0 +1,197 @@ +/* Copyright (c) 2019-2020, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "worker.h" + +#include "log.h" +#include "options.h" +#include "trx.h" +#include "wsrep.h" + +#include <assert.h> +#include <pthread.h> +#include <stdbool.h> +#include <string.h> // strerror() + +struct node_worker +{ + struct node_ctx* node; + pthread_t thread_id; + size_t id; + bool exit; +}; + +enum wsrep_cb_status +node_worker_apply_cb(void* const recv_ctx, + const wsrep_ws_handle_t* const ws_handle, + uint32_t const ws_flags, + const wsrep_buf_t* const ws, + const wsrep_trx_meta_t* const ws_meta, + wsrep_bool_t* const exit_loop) +{ + assert(recv_ctx); + + struct node_worker* const worker = recv_ctx; + + wsrep_status_t const ret = node_trx_apply( + worker->node->store, + node_wsrep_provider(worker->node->wsrep), + ws_handle, + ws_meta, + ws_flags & WSREP_FLAG_ROLLBACK ? NULL : ws); + + *exit_loop = worker->exit; + + return WSREP_OK == ret ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE; +} + + +static void* +worker_slave(void* recv_ctx) +{ + struct node_worker* const worker = recv_ctx; + wsrep_t* const wsrep = node_wsrep_provider(worker->node->wsrep); + + wsrep_status_t const ret = wsrep->recv(wsrep, worker); + + if (WSREP_OK != ret) + { + NODE_ERROR("slave worker [%zu] exited with error %d.", worker->id, ret); + } + + return NULL; +} + +static void* +worker_master(void* send_ctx) +{ + struct node_worker* const worker = send_ctx; + struct node_ctx* const node = worker->node; + wsrep_t* const wsrep = node_wsrep_provider(node->wsrep); + + assert(node->opts->ws_size > 0); + + wsrep_status_t ret; + + do + { + /* REPLICATION: we should not perform any local writes until the node + * is synced with the cluster. */ + if (!node_wsrep_wait_synced(node->wsrep)) + { + NODE_ERROR("master worker [%zu] failed waiting for SYNCED state.", + worker->id); + break; + } + + /* REPLICATION: the node is now synced */ + + do + { + ret = node_trx_execute(node->store, + wsrep, + worker->id, + (int)node->opts->operations); + } + while(WSREP_OK == ret // success + || (WSREP_TRX_FAIL == ret // certification failed, trx rolled back + && (usleep(10000),true)) // retry after short sleep + ); + } + while (WSREP_CONN_FAIL == ret); // provider in bad state (e.g. non-Primary) + + return NULL; +} + +struct node_worker_pool +{ + size_t size; // size of the pool (nu,ber of nodes) + struct node_worker worker[1]; // worker context array; +}; + +struct node_worker_pool* +node_worker_start(struct node_ctx* const ctx, + node_worker_type_t const type, + size_t const size) +{ + assert(ctx); + + if (0 == size) return NULL; + + const char* const type_str = type == NODE_WORKER_SLAVE ? "slave" : "master"; + + size_t const alloc_size = + sizeof(struct node_worker_pool) + + sizeof(struct node_worker) * (size - 1); + + struct node_worker_pool* const ret = malloc(alloc_size); + + if (ret) + { + void* (* const routine) (void*) = + type == NODE_WORKER_SLAVE ? worker_slave : worker_master; + + size_t i; + for (i = 0; i < size; i++) + { + struct node_worker* const worker = &ret->worker[i]; + worker->node = ctx; + worker->id = i; + worker->exit = false; + + int const err = pthread_create(&worker->thread_id, + NULL, + routine, + worker); + if (err) + { + NODE_ERROR("Failed to start %s worker[%zu]: %d (%s)", + type_str, i, err, strerror(err)); + if (0 == i) + { + free(ret); + return NULL; + } + else + { + break; // some threads have started, + // need to return to close them first + } + } + } + + ret->size = i; + } + else + { + NODE_ERROR("Failed to allocate %zu bytes for the %s worker pool", + alloc_size, type_str); + } + + return ret; +} + +void +node_worker_stop(struct node_worker_pool* pool) +{ + size_t i; + for (i = 0; pool && i < pool->size; i++) + { + pthread_join(pool->worker[i].thread_id, NULL); + } + + free(pool); +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/worker.h b/wsrep-lib/wsrep-API/v26/examples/node/worker.h new file mode 100644 index 00000000..7ae06423 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/worker.h @@ -0,0 +1,66 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines worker thread interface + */ + +#ifndef NODE_WORKER_H +#define NODE_WORKER_H + +#include "ctx.h" + +#include "../../wsrep_api.h" + +/** + * REPLICATION: a callback to apply and commit slave replication events */ +extern enum wsrep_cb_status +node_worker_apply_cb(void* recv_ctx, + const wsrep_ws_handle_t* ws_handle, + uint32_t ws_flags, + const wsrep_buf_t* ws, + const wsrep_trx_meta_t* ws_meta, + wsrep_bool_t* exit_loop); + +typedef enum node_worker_type +{ + NODE_WORKER_SLAVE, + NODE_WORKER_MASTER +} + node_worker_type_t; + +struct node_worker_pool; + +/** + * Starts the required number of workier threads of a given type + * + * @param[in] ctx application context + * @param[in] type of a worker + * @param[in] number of workers + * + * @return worker pool handle + */ +extern struct node_worker_pool* +node_worker_start(struct node_ctx* ctx, + node_worker_type_t type, + size_t number); + +/** + * Stops workers in a pool and deallocates respective resources */ +extern void +node_worker_stop(struct node_worker_pool* pool); + +#endif /* NODE_WORKER_H */ diff --git a/wsrep-lib/wsrep-API/v26/examples/node/wsrep.c b/wsrep-lib/wsrep-API/v26/examples/node/wsrep.c new file mode 100644 index 00000000..6cea6d90 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/wsrep.c @@ -0,0 +1,479 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "wsrep.h" + +#include "log.h" +#include "sst.h" +#include "store.h" +#include "worker.h" + +#include <assert.h> +#include <stdio.h> // snprintf() +#include <stdlib.h> // abort() +#include <string.h> // strcasecmp() + +struct node_wsrep +{ + wsrep_t* instance; // wsrep provider instance + + struct wsrep_view + { + pthread_mutex_t mtx; + wsrep_gtid_t state_id; + wsrep_view_status_t status; + wsrep_cap_t capabilities; + int proto_ver; + int memb_num; + int my_idx; + wsrep_member_info_t* members; + } + view; + + struct + { + pthread_mutex_t mtx; + pthread_cond_t cond; + int value; + } + synced; + + bool bootstrap; // shall this node bootstrap a primary view? +}; + +static struct node_wsrep s_wsrep = +{ + .instance = NULL, + .view = + { + .mtx = PTHREAD_MUTEX_INITIALIZER, + .state_id = {{{ 0, }}, WSREP_SEQNO_UNDEFINED }, + .status = WSREP_VIEW_DISCONNECTED, + .capabilities = 0, + .proto_ver = -1, + .memb_num = 0, + .my_idx = -1, + .members = NULL + }, + .synced = + { + .mtx = PTHREAD_MUTEX_INITIALIZER, + .cond = PTHREAD_COND_INITIALIZER, + .value = 0 + }, + .bootstrap = false +}; + +static const char* wsrep_view_status_str[WSREP_VIEW_MAX] = +{ + "PRIMARY", + "NON-PRIMARY", + "DISCONNECTED" +}; + +#define WSREP_CAPABILITIES_MAX ((int)sizeof(wsrep_cap_t) * 8) // bitmask +static const char* wsrep_capabilities_str[WSREP_CAPABILITIES_MAX] = +{ + "MULTI-MASTER", + "CERTIFICATION", + "PA", + "REPLAY", + "TOI", + "PAUSE", + "CAUSAL-READS", + "CAUSAL-TRX", + "INCREMENTAL", + "SESSION-LOCKS", + "DISTRIBUTED-LOCKS", + "CONSISTENCY-CHECK", + "UNORDERED", + "ANNOTATION", + "PREORDERED", + "STREAMING", + "SNAPSHOT", + "NBO", + NULL, +}; + +/** + * REPLICATION: callback is called by provider when the node connects to group. + * This happens out-of-order, before the node receives a state + * transfer and syncs with the cluster. Unless application requires + * it it can be empty. We however want to know the GTID of the + * group out of order for SST tricks, so we record it out of order. + */ +static enum wsrep_cb_status +wsrep_connected_cb(void* const x, + const wsrep_view_info_t* const v) +{ + char gtid_str[WSREP_GTID_STR_LEN + 1]; + wsrep_gtid_print(&v->state_id, gtid_str, sizeof(gtid_str)); + + NODE_INFO("connect_cb(): Connected at %s to %s group of %d member(s)", + gtid_str, wsrep_view_status_str[v->status], v->memb_num); + + struct node_wsrep* const wsrep = ((struct node_ctx*)x)->wsrep; + + if (pthread_mutex_lock(&wsrep->view.mtx)) + { + NODE_FATAL("Failed to lock VIEW mutex"); + abort(); + } + + wsrep->view.state_id = v->state_id; + + pthread_mutex_unlock(&wsrep->view.mtx); + + return WSREP_CB_SUCCESS; +} + +/** + * logs view data */ +static void +wsrep_log_view(const struct wsrep_view* v) +{ + char gtid[WSREP_GTID_STR_LEN + 1]; + wsrep_gtid_print(&v->state_id, gtid, sizeof(gtid)); + gtid[WSREP_GTID_STR_LEN] = '\0'; + + char caps[256]; + int written = 0; + size_t space_left = sizeof(caps); + int i; + for (i = 0; i < WSREP_CAPABILITIES_MAX && space_left > 0; i++) + { + wsrep_cap_t const f = 1u << i; + + if (!(f & v->capabilities)) continue; + + if (wsrep_capabilities_str[i]) + { + written += snprintf(&caps[written], space_left, "%s|", + wsrep_capabilities_str[i]); + } + else + { + written += snprintf(&caps[written], space_left, "%d|", i); + } + + space_left = sizeof(caps) - (size_t)written; + } + caps[written ? written - 1 : 0] = '\0'; // overwrite last '|' + + char members_list[1024]; + written = 0; + space_left = sizeof(members_list); + for (i = 0; i < v->memb_num && space_left > 0; i++) + { + wsrep_member_info_t* m = &v->members[i]; + char uuid[WSREP_UUID_STR_LEN + 1]; + wsrep_uuid_print(&m->id, uuid, sizeof(uuid)); + uuid[WSREP_UUID_STR_LEN] = '\0'; + + written += snprintf(&members_list[written], space_left, + "%s%d: %s '%s' incoming:'%s'\n", + v->my_idx == i ? " * " : " ", i, + uuid, m->name, m->incoming); + + space_left = sizeof(members_list) - (size_t)written; + } + members_list[written ? written - 1 : 0] = '\0'; // overwrite the last '\n' + + NODE_INFO( + "New view received:\n" + "state: %s (%s)\n" + "capabilities: %s\n" + "protocol version: %d\n" + "members(%d)%s%s", + gtid, wsrep_view_status_str[v->status], + caps, + v->proto_ver, + v->memb_num, v->memb_num ? ":\n" : "", members_list); +} + +/** + * REPLICATION: callback is called when the node needs to process cluster + * view change. The callback is called in "total order isolation", + * so all the preceding replication events will be processed + * strictly before the call and all subsequent - striclty after. + */ +static enum wsrep_cb_status +wsrep_view_cb(void* const x, + void* const r, + const wsrep_view_info_t* const v, + const char* const state, + size_t const state_len) +{ + (void)r; + (void)state; + (void)state_len; + + struct node_ctx* const node = x; + + if (WSREP_VIEW_PRIMARY == v->status) + { + /* REPLICATION: membership change is a totally ordered event and as such + * should be a part of the state, like changes to the + * database. */ + int err = node_store_update_membership(node->store, v); + if (err) + { + NODE_FATAL("Failed to update membership in store: %d (%s)", + err, strerror(-err)); + abort(); + } + } + + enum wsrep_cb_status ret = WSREP_CB_SUCCESS; + struct node_wsrep* const wsrep = ((struct node_ctx*)x)->wsrep; + + if (pthread_mutex_lock(&wsrep->view.mtx)) + { + NODE_FATAL("Failed to lock VIEW mutex"); + abort(); + } + + /* below we'll just copy the data for future reference (if need be): */ + + size_t const memb_size = (size_t)v->memb_num * sizeof(wsrep_member_info_t); + void* const tmp = realloc(wsrep->view.members, memb_size); + if (memb_size > 0 && !tmp) + { + NODE_ERROR("Could not allocate memory for a new view: %zu bytes", + memb_size); + ret = WSREP_CB_FAILURE; + goto cleanup; + } + else + { + wsrep->view.members = tmp; + if (memb_size) memcpy(wsrep->view.members, &v->members[0], memb_size); + } + + wsrep->view.state_id = v->state_id; + wsrep->view.status = v->status; + wsrep->view.capabilities = v->capabilities; + wsrep->view.proto_ver = v->proto_ver; + wsrep->view.memb_num = v->memb_num; + wsrep->view.my_idx = v->my_idx; + + /* and now log the info */ + + wsrep_log_view(&wsrep->view); + +cleanup: + pthread_mutex_unlock(&wsrep->view.mtx); + + return ret; +} + +/** + * REPLICATION: callback is called by provider when the node becomes SYNCED */ +static enum wsrep_cb_status +wsrep_synced_cb(void* const x) +{ + struct node_wsrep* const wsrep = ((struct node_ctx*)x)->wsrep; + + if (pthread_mutex_lock(&wsrep->synced.mtx)) + { + NODE_FATAL("Failed to lock SYNCED mutex"); + abort(); + } + + if (wsrep->synced.value == 0) + { + NODE_INFO("become SYNCED"); + wsrep->synced.value = 1; + pthread_cond_broadcast(&wsrep->synced.cond); + } + + pthread_mutex_unlock(&wsrep->synced.mtx); + + return WSREP_CB_SUCCESS; +} + +struct node_wsrep* +node_wsrep_init(const struct node_options* const opts, + const wsrep_gtid_t* const current_gtid, + void* const app_ctx) +{ + if (s_wsrep.instance != NULL) return NULL; // already initialized + + wsrep_status_t err; + err = wsrep_load(opts->provider, &s_wsrep.instance, node_log_cb); + if (WSREP_OK != err) + { + if (strcasecmp(opts->provider, WSREP_NONE)) + { + NODE_ERROR("wsrep_load(%s) failed: %s (%d).", + opts->provider, strerror(err), err); + } + else + { + NODE_ERROR("Initializing dummy provider failed: %s (%d).", + strerror(err), err); + } + return NULL; + } + + char base_addr[256]; + snprintf(base_addr, sizeof(base_addr) - 1, "%s:%ld", + opts->base_host, opts->base_port); + + struct wsrep_init_args args = + { + .app_ctx = app_ctx, + + .node_name = opts->name, + .node_address = base_addr, + .node_incoming = "", // we don't accept client connections + .data_dir = opts->data_dir, + .options = opts->options, + .proto_ver = 0, // this is the first version of the application + // so the first version of the writeset protocol + .state_id = current_gtid, + .state = NULL, // unused + + .logger_cb = node_log_cb, + .connected_cb = wsrep_connected_cb, + .view_cb = wsrep_view_cb, + .synced_cb = wsrep_synced_cb, + .encrypt_cb = NULL, // not implemented ATM + + .apply_cb = node_worker_apply_cb, + .unordered_cb = NULL, // not needed now + + .sst_request_cb = node_sst_request_cb, + .sst_donate_cb = node_sst_donate_cb + }; + + wsrep_t* wsrep = s_wsrep.instance; + + err = wsrep->init(wsrep, &args); + + if (WSREP_OK != err) + { + NODE_ERROR("wsrep::init() failed: %d, must shutdown", err); + node_wsrep_close(&s_wsrep); + return NULL; + } + + return &s_wsrep; +} + +wsrep_status_t +node_wsrep_connect(struct node_wsrep* const wsrep, + const char* const address, + bool const bootstrap) +{ + wsrep->bootstrap = bootstrap; + wsrep_status_t err = wsrep->instance->connect(wsrep->instance, + "wsrep_cluster", + address, + NULL, + wsrep->bootstrap); + + if (WSREP_OK != err) + { + NODE_ERROR("wsrep::connect(%s) failed: %d, must shutdown", + address, err); + node_wsrep_close(wsrep); + } + + return err; +} + +void +node_wsrep_disconnect(struct node_wsrep* const wsrep) +{ + if (pthread_mutex_lock(&wsrep->synced.mtx)) + { + NODE_FATAL("Failed to lock SYNCED mutex"); + abort(); + } + wsrep->synced.value = -1; /* this will signal master threads to exit */ + pthread_cond_broadcast(&wsrep->synced.cond); + pthread_mutex_unlock(&wsrep->synced.mtx); + + wsrep_status_t const err = wsrep->instance->disconnect(wsrep->instance); + + if (err) + { + /* REPLICATION: unless connection is not closed, slave threads will + * never return. */ + NODE_FATAL("Failed to close wsrep connection: %d", err); + abort(); + } +} + +void +node_wsrep_close(struct node_wsrep* const wsrep) +{ + if (pthread_mutex_lock(&wsrep->view.mtx)) + { + NODE_FATAL("Failed to lock VIEW mutex"); + abort(); + } + assert(0 == wsrep->view.memb_num); // the node must be disconneted + assert(NULL == wsrep->view.members); + free(wsrep->view.members); + wsrep->view.members = NULL; + pthread_mutex_unlock(&wsrep->view.mtx); + + wsrep->instance->free(wsrep->instance); + wsrep->instance = NULL; +} + +bool +node_wsrep_wait_synced(struct node_wsrep* const wsrep) +{ + if (pthread_mutex_lock(&wsrep->synced.mtx)) + { + NODE_FATAL("Failed to lock SYNCED mutex"); + abort(); + } + + while (wsrep->synced.value == 0) + { + pthread_cond_wait(&wsrep->synced.cond, &wsrep->synced.mtx); + } + + bool const ret = wsrep->synced.value > 0; + + pthread_mutex_unlock(&wsrep->synced.mtx); + + return ret; +} + +void +node_wsrep_connected_gtid(struct node_wsrep* wsrep, wsrep_gtid_t* gtid) +{ + if (pthread_mutex_lock(&wsrep->view.mtx)) + { + NODE_FATAL("Failed to lock VIEW mutex"); + abort(); + } + + *gtid = wsrep->view.state_id; + + pthread_mutex_unlock(&wsrep->view.mtx); +} + +wsrep_t* +node_wsrep_provider(struct node_wsrep* wsrep) +{ + return wsrep->instance; +} diff --git a/wsrep-lib/wsrep-API/v26/examples/node/wsrep.h b/wsrep-lib/wsrep-API/v26/examples/node/wsrep.h new file mode 100644 index 00000000..75c7eac3 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/examples/node/wsrep.h @@ -0,0 +1,92 @@ +/* Copyright (c) 2019, Codership Oy. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file This unit defines various helpers to manage wsrep provider + */ + +#ifndef NODE_WSREP_H +#define NODE_WSREP_H + +#include "options.h" + +#include "../../wsrep_api.h" + +#include <pthread.h> +#include <stdbool.h> + +typedef struct node_wsrep node_wsrep_t; + +/** + * loads and initializes wsrep provider for further usage + * + * @param[in] opts program options + * @param[in] current_gtid GTID corresponding to the current node state + * @param[in] app_ctx application context to be passed to callbacks + * + * @return initialized object pointer + */ +extern node_wsrep_t* +node_wsrep_init(const struct node_options* opts, + const wsrep_gtid_t* current_gtid, + void* app_ctx); + +/** + * connects to primary component + * + * @param[in] wsrep wsrep context + * @param[in] address address to connect at (provider specific) + * @param[in] bootsstrap bootstrap primary component if there's none + * + * @return wsrep status code + */ +extern wsrep_status_t +node_wsrep_connect(node_wsrep_t* wsrep, + const char* address, + bool bootstrap); + +/** + * disconnects from primary component + */ +extern void +node_wsrep_disconnect(node_wsrep_t* wsrep); + +/** + * deinitializes and unloads wsrep provider + */ +extern void +node_wsrep_close(node_wsrep_t* wsrep); + +/** + * waits for the node to become SYNCED + * + * @return true if node is synced, false in any other event. + */ +extern bool +node_wsrep_wait_synced(node_wsrep_t* wsrep); + +/** + * @param[in] wsrep context + * @param[out] gtid of the current view */ +extern void +node_wsrep_connected_gtid(node_wsrep_t* wsrep, wsrep_gtid_t* gtid); + +/** + * @return wsrep provider instance */ +extern wsrep_t* +node_wsrep_provider(node_wsrep_t* wsrep); + +#endif /* NODE_WSREP_H */ diff --git a/wsrep-lib/wsrep-API/v26/wsrep.xcf b/wsrep-lib/wsrep-API/v26/wsrep.xcf Binary files differnew file mode 100644 index 00000000..54108c6b --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/wsrep.xcf diff --git a/wsrep-lib/wsrep-API/v26/wsrep_allowlist_service.h b/wsrep-lib/wsrep-API/v26/wsrep_allowlist_service.h new file mode 100644 index 00000000..71f4d7e9 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/wsrep_allowlist_service.h @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2021 Codership Oy <info@codership.com> + * + * This file is part of wsrep-API. + * + * Wsrep-API is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-API is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-API. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file wsrep_allowlist_service.h + * + * This file defines interface for connection allowlist checks. + * + * The provider which is capable of using the service interface v1 must + * export the following functions. + * + * int wsrep_init_allowlist_service_v1(wsrep_allowlist_service_v1_t*) + * void wsrep_deinit_allowlist_service_v1() + * + * which can be probed by the application. + * + * The application must initialize the service via above init function + * before the provider is initialized via wsrep->init(). The deinit + * function must be called after the provider side resources have been + * released via wsrep->free(). + */ + +#ifndef WSREP_ALLOWLIST_SERVICE_H +#define WSREP_ALLOWLIST_SERVICE_H + +#include "wsrep_api.h" + +#ifdef __cplusplus +extern "C" +{ +#endif /* __cplusplus */ + +/** + * Type tag for application defined allowlist processing context. + * + * Application may pass pointer to the context when initializing + * the allowlist service. This pointer is passed a first parameter for + * each service call. + */ +typedef struct wsrep_allowlist_context wsrep_allowlist_context_t; + +typedef enum +{ + WSREP_ALLOWLIST_KEY_IP = 0, // IP allowlist check + WSREP_ALLOWLIST_KEY_SSL // SSL certificate allowlist check +} wsrep_allowlist_key_t; + +/* + * Allowlist connection check callback. + * + * @retval WSREP_OK connection allowed + * @retval WSREP_NOT_ALLOWED connection not allowed + */ +typedef wsrep_status_t (*wsrep_allowlist_cb_t)( + wsrep_allowlist_context_t*, + wsrep_allowlist_key_t key, + const wsrep_buf_t* value); + +/** + * Allowlist service struct. + * + * A pointer to this struct must be passed to the call to + * wsrep_init_allowlist_service_v1. + * + * The application must provide implementation to all functions defined + * in this struct. + */ +typedef struct wsrep_allowlist_service_v1_st +{ + /* Allowlist check callback */ + wsrep_allowlist_cb_t allowlist_cb; + /* Pointer to application defined allowlist context. */ + wsrep_allowlist_context_t* context; +} wsrep_allowlist_service_v1_t; + +#ifdef __cplusplus +} +#endif /* __cplusplus */ + + +#define WSREP_ALLOWLIST_SERVICE_INIT_FUNC_V1 "wsrep_init_allowlist_service_v1" +#define WSREP_ALLOWLIST_SERVICE_DEINIT_FUNC_V1 "wsrep_deinit_allowlist_service_v1" + +#endif /* WSREP_ALLOWLIST_SERVICE_H */ + diff --git a/wsrep-lib/wsrep-API/v26/wsrep_api.h b/wsrep-lib/wsrep-API/v26/wsrep_api.h new file mode 100644 index 00000000..59bb71da --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/wsrep_api.h @@ -0,0 +1,1380 @@ +/* Copyright (C) 2009-2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +/*! + @file wsrep API declaration. + + HOW TO READ THIS FILE. + + Due to C language rules this header layout doesn't lend itself to intuitive + reading. So here's the scoop: in the end this header declares two main types: + + * struct wsrep_init_args + + and + + * struct wsrep + + wsrep_init_args contains initialization parameters for wsrep provider like + names, addresses, etc. and pointers to callbacks. The callbacks will be called + by provider when it needs to do something application-specific, like log a + message or apply a writeset. It should be passed to init() call from + wsrep API. It is an application part of wsrep API contract. + + struct wsrep is the interface to wsrep provider. It contains all wsrep API + calls. It is a provider part of wsrep API contract. + + Finally, wsrep_load() method loads (dlopens) wsrep provider library. It is + defined in wsrep_loader.c unit and is part of libwsrep.a (which is not a + wsrep provider, but a convenience library). + + wsrep_unload() does the reverse. + +*/ +#ifndef WSREP_H +#define WSREP_H + +#include <stdint.h> +#include <stdbool.h> +#include <stdlib.h> +#include <unistd.h> +#include <time.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/************************************************************************** + * * + * wsrep replication API * + * * + **************************************************************************/ + +#define WSREP_INTERFACE_VERSION "26" + +/*! Empty backend spec */ +#define WSREP_NONE "none" + + +/*! + * @brief log severity levels, passed as first argument to log handler + */ +typedef enum wsrep_log_level +{ + WSREP_LOG_FATAL, //!< Unrecoverable error, application must quit. + WSREP_LOG_ERROR, //!< Operation failed, must be repeated. + WSREP_LOG_WARN, //!< Unexpected condition, but no operational failure. + WSREP_LOG_INFO, //!< Informational message. + WSREP_LOG_DEBUG //!< Debug message. Shows only of compiled with debug. +} wsrep_log_level_t; + +/*! + * @brief error log handler + * + * All messages from wsrep provider are directed to this + * handler, if present. + * + * @param level log level + * @param message log message + */ +typedef void (*wsrep_log_cb_t)(wsrep_log_level_t, const char *); + + +/*! + * Certain provider capabilities application may want to know about + */ +#define WSREP_CAP_MULTI_MASTER ( 1ULL << 0 ) +#define WSREP_CAP_CERTIFICATION ( 1ULL << 1 ) +#define WSREP_CAP_PARALLEL_APPLYING ( 1ULL << 2 ) +#define WSREP_CAP_TRX_REPLAY ( 1ULL << 3 ) +#define WSREP_CAP_ISOLATION ( 1ULL << 4 ) +#define WSREP_CAP_PAUSE ( 1ULL << 5 ) +#define WSREP_CAP_CAUSAL_READS ( 1ULL << 6 ) +#define WSREP_CAP_CAUSAL_TRX ( 1ULL << 7 ) +#define WSREP_CAP_INCREMENTAL_WRITESET ( 1ULL << 8 ) +#define WSREP_CAP_SESSION_LOCKS ( 1ULL << 9 ) +#define WSREP_CAP_DISTRIBUTED_LOCKS ( 1ULL << 10 ) +#define WSREP_CAP_CONSISTENCY_CHECK ( 1ULL << 11 ) +#define WSREP_CAP_UNORDERED ( 1ULL << 12 ) +#define WSREP_CAP_ANNOTATION ( 1ULL << 13 ) +#define WSREP_CAP_PREORDERED ( 1ULL << 14 ) +#define WSREP_CAP_STREAMING ( 1ULL << 15 ) +#define WSREP_CAP_SNAPSHOT ( 1ULL << 16 ) +#define WSREP_CAP_NBO ( 1ULL << 17 ) + +typedef uint32_t wsrep_cap_t; //!< capabilities bitmask + +/*! + * Writeset flags + * + * TRX_END the writeset and all preceding writesets must be committed + * ROLLBACK all preceding writesets in a transaction must be rolled back + * ISOLATION the writeset must be applied AND committed in isolation + * PA_UNSAFE the writeset cannot be applied in parallel + * COMMUTATIVE the order in which the writeset is applied does not matter + * NATIVE the writeset contains another writeset in this provider format + * + * TRX_START shall be set on the first trx fragment by provider + * TRX_PREPARE shall be set on the fragment which prepares the transaction + * + * Note that some of the flags are mutually exclusive (e.g. TRX_END and + * ROLLBACK). + */ +#define WSREP_FLAG_TRX_END ( 1ULL << 0 ) +#define WSREP_FLAG_ROLLBACK ( 1ULL << 1 ) +#define WSREP_FLAG_ISOLATION ( 1ULL << 2 ) +#define WSREP_FLAG_PA_UNSAFE ( 1ULL << 3 ) +#define WSREP_FLAG_COMMUTATIVE ( 1ULL << 4 ) +#define WSREP_FLAG_NATIVE ( 1ULL << 5 ) +#define WSREP_FLAG_TRX_START ( 1ULL << 6 ) +#define WSREP_FLAG_TRX_PREPARE ( 1ULL << 7 ) +#define WSREP_FLAG_SNAPSHOT ( 1ULL << 8 ) +#define WSREP_FLAG_IMPLICIT_DEPS ( 1ULL << 9 ) + +#define WSREP_FLAGS_LAST WSREP_FLAG_IMPLICIT_DEPS +#define WSREP_FLAGS_MASK ((WSREP_FLAGS_LAST << 1) - 1) + + +typedef uint64_t wsrep_trx_id_t; //!< application transaction ID +typedef uint64_t wsrep_conn_id_t; //!< application connection ID +typedef int64_t wsrep_seqno_t; //!< sequence number of a writeset, etc. +#ifdef __cplusplus +typedef bool wsrep_bool_t; +#else +typedef _Bool wsrep_bool_t; //!< should be the same as standard (C99) bool +#endif /* __cplusplus */ + +/*! undefined seqno */ +#define WSREP_SEQNO_UNDEFINED (-1) + + +/*! wsrep provider status codes */ +typedef enum wsrep_status +{ + WSREP_OK = 0, //!< success + WSREP_WARNING, //!< minor warning, error logged + WSREP_TRX_MISSING, //!< transaction is not known by wsrep + WSREP_TRX_FAIL, //!< transaction aborted, server can continue + WSREP_BF_ABORT, //!< trx was victim of brute force abort + WSREP_SIZE_EXCEEDED, //!< data exceeded maximum supported size + WSREP_CONN_FAIL, //!< error in client connection, must abort + WSREP_NODE_FAIL, //!< error in node state, wsrep must reinit + WSREP_FATAL, //!< fatal error, server must abort + WSREP_NOT_IMPLEMENTED, //!< feature not implemented + WSREP_NOT_ALLOWED //!< operation not allowed +} wsrep_status_t; + + +/*! wsrep callbacks status codes */ +typedef enum wsrep_cb_status +{ + WSREP_CB_SUCCESS = 0, //!< success (as in "not critical failure") + WSREP_CB_FAILURE //!< critical failure (consistency violation) + /* Technically, wsrep provider has no use for specific failure codes since + * there is nothing it can do about it but abort execution. Therefore any + * positive number shall indicate a critical failure. Optionally that value + * may be used by provider to come to a consensus about state consistency + * in a group of nodes. */ +} wsrep_cb_status_t; + + +/*! + * UUID type - for all unique IDs + */ +typedef union wsrep_uuid { + uint8_t data[16]; + size_t alignment; +} wsrep_uuid_t; + +/*! Undefined UUID */ +static const wsrep_uuid_t WSREP_UUID_UNDEFINED = {{0,}}; + +/*! UUID string representation length, terminating '\0' not included */ +#define WSREP_UUID_STR_LEN 36 + +/*! + * Scan UUID from string + * @return length of UUID string representation or negative error code + */ +extern int +wsrep_uuid_scan (const char* str, size_t str_len, wsrep_uuid_t* uuid); + +/*! + * Print UUID to string + * @return length of UUID string representation or negative error code + */ +extern int +wsrep_uuid_print (const wsrep_uuid_t* uuid, char* str, size_t str_len); + +/*! + * @brief Compare two UUIDs + * + * Performs a byte by byte comparison of lhs and rhs. + * Returns 0 if lhs and rhs match, otherwise -1 or 1 according to the + * difference of the first byte that differs in lsh and rhs. + * + * @return -1, 0, 1 if lhs is respectively smaller, equal, or greater than rhs + */ +extern int +wsrep_uuid_compare (const wsrep_uuid_t* lhs, const wsrep_uuid_t* rhs); + +#define WSREP_MEMBER_NAME_LEN 32 //!< maximum logical member name length +#define WSREP_INCOMING_LEN 256 //!< max Domain Name length + 0x00 + + +/*! + * Global transaction identifier + */ +typedef struct wsrep_gtid +{ + wsrep_uuid_t uuid; /*!< History UUID */ + wsrep_seqno_t seqno; /*!< Sequence number */ +} wsrep_gtid_t; + +/*! Undefined GTID */ +static const wsrep_gtid_t WSREP_GTID_UNDEFINED = {{{0, }}, -1}; + +/*! Minimum number of bytes guaranteed to store GTID string representation, + * terminating '\0' not included (36 + 1 + 20) */ +#define WSREP_GTID_STR_LEN 57 + + +/*! + * Scan GTID from string + * @return length of GTID string representation or negative error code + */ +extern int +wsrep_gtid_scan(const char* str, size_t str_len, wsrep_gtid_t* gtid); + +/*! + * Print GTID to string + * @return length of GTID string representation or negative error code + */ +extern int +wsrep_gtid_print(const wsrep_gtid_t* gtid, char* str, size_t str_len); + +/*! + * Source/server transaction ID (trx ID assigned at originating node) + */ +typedef struct wsrep_stid { + wsrep_uuid_t node; //!< source node ID + wsrep_trx_id_t trx; //!< local trx ID at source + wsrep_conn_id_t conn; //!< local connection ID at source +} wsrep_stid_t; + +/*! + * Transaction meta data + */ +typedef struct wsrep_trx_meta +{ + wsrep_gtid_t gtid; /*!< Global transaction identifier */ + wsrep_stid_t stid; /*!< Source transaction identifier */ + wsrep_seqno_t depends_on; /*!< Sequence number of the last transaction + this transaction may depend on */ +} wsrep_trx_meta_t; + +/*! Abstract data buffer structure */ +typedef struct wsrep_buf +{ + const void* ptr; /*!< Pointer to data buffer */ + size_t len; /*!< Length of buffer */ +} wsrep_buf_t; + +/*! Transaction handle struct passed for wsrep transaction handling calls */ +typedef struct wsrep_ws_handle +{ + wsrep_trx_id_t trx_id; //!< transaction ID + void* opaque; //!< opaque provider transaction context data +} wsrep_ws_handle_t; + +/*! + * member status + */ +typedef enum wsrep_member_status { + WSREP_MEMBER_UNDEFINED, //!< undefined state + WSREP_MEMBER_JOINER, //!< incomplete state, requested state transfer + WSREP_MEMBER_DONOR, //!< complete state, donates state transfer + WSREP_MEMBER_JOINED, //!< complete state + WSREP_MEMBER_SYNCED, //!< complete state, synchronized with group + WSREP_MEMBER_ERROR, //!< this and above is provider-specific error code + WSREP_MEMBER_MAX +} wsrep_member_status_t; + +/*! + * static information about a group member (some fields are tentative yet) + */ +typedef struct wsrep_member_info { + wsrep_uuid_t id; //!< group-wide unique member ID + char name[WSREP_MEMBER_NAME_LEN]; //!< human-readable name + char incoming[WSREP_INCOMING_LEN]; //!< address for client requests +} wsrep_member_info_t; + +/*! + * group status + */ +typedef enum wsrep_view_status { + WSREP_VIEW_PRIMARY, //!< primary group configuration (quorum present) + WSREP_VIEW_NON_PRIMARY, //!< non-primary group configuration (quorum lost) + WSREP_VIEW_DISCONNECTED, //!< not connected to group, retrying. + WSREP_VIEW_MAX +} wsrep_view_status_t; + +/*! + * view of the group + */ +typedef struct wsrep_view_info { + wsrep_gtid_t state_id; //!< global state ID + wsrep_seqno_t view; //!< global view number + wsrep_view_status_t status; //!< view status + wsrep_cap_t capabilities;//!< capabilities available in the view + int my_idx; //!< index of this member in the view + int memb_num; //!< number of members in the view + int proto_ver; //!< application protocol agreed on the view + wsrep_member_info_t members[1];//!< array of member information +} wsrep_view_info_t; + + +/*! + * @brief connected to group + * + * This handler is called once the first primary view is seen. + * The purpose of this call is to provide basic information only, + * like node UUID and group UUID. + */ +typedef enum wsrep_cb_status (*wsrep_connected_cb_t) ( + void* app_ctx, + const wsrep_view_info_t* view +); + + +/*! + * @brief group view handler + * + * This handler is called in *total order* corresponding to the group + * configuration change. It is to provide a vital information about + * new group view. + * + * @param app_ctx application context + * @param recv_ctx receiver context + * @param view new view on the group + * @param state current state + * @param state_len length of current state + */ +typedef enum wsrep_cb_status (*wsrep_view_cb_t) ( + void* app_ctx, + void* recv_ctx, + const wsrep_view_info_t* view, + const char* state, + size_t state_len +); + + +/*! + * Magic string to tell provider to engage into trivial (empty) state transfer. + * No data will be passed, but the node shall be considered JOINED. + * Should be passed in sst_req parameter of wsrep_sst_cb_t. + */ +#define WSREP_STATE_TRANSFER_TRIVIAL "trivial" + +/*! + * Magic string to tell provider not to engage in state transfer at all. + * The member will stay in WSREP_MEMBER_UNDEFINED state but will keep on + * receiving all writesets. + * Should be passed in sst_req parameter of wsrep_sst_cb_t. + */ +#define WSREP_STATE_TRANSFER_NONE "none" + + +/*! + * @brief Creates and returns State Snapshot Transfer request for provider. + * + * This handler is called whenever the node is found to miss some of events + * from the cluster history (e.g. fresh node joining the cluster). + * SST will be used if it is impossible (or impractically long) to replay + * missing events, which may be not known in advance, so the node must always + * be ready to accept full SST or abort in case event replay is impossible. + * + * Normally SST request is an opaque buffer that is passed to the + * chosen SST donor node and must contain information sufficient for + * donor to deliver SST (typically SST method and delivery address). + * See above macros WSREP_STATE_TRANSFER_TRIVIAL and WSREP_STATE_TRANSFER_NONE + * to modify the standard provider behavior. + * + * @note Currently it is assumed that sst_req is allocated using + * malloc()/calloc()/realloc() and it will be freed by + * wsrep provider. + * + * @param app_ctx application context + * @param sst_req location to store SST request + * @param sst_req_len location to store SST request length or error code, + * value of 0 means no SST. + */ +typedef enum wsrep_cb_status (*wsrep_sst_request_cb_t) ( + void* app_ctx, + void** sst_req, + size_t* sst_req_len +); + + +/*! + * @brief apply callback + * + * This handler is called from wsrep library to apply replicated writeset + * Must support brute force applying for multi-master operation + * + * @param recv_ctx receiver context pointer provided by the application + * @param ws_handle internal provider writeset handle + * @param flags WSREP_FLAG_... flags + * @param data data buffer containing the writeset + * @param meta transaction meta data of the writeset to be applied + * @param exit_loop set to true to exit receive loop + * + * @return error code: + * @retval 0 - success + * @retval non-0 - application-specific error code + */ +typedef enum wsrep_cb_status (*wsrep_apply_cb_t) ( + void* recv_ctx, + const wsrep_ws_handle_t* ws_handle, + uint32_t flags, + const wsrep_buf_t* data, + const wsrep_trx_meta_t* meta, + wsrep_bool_t* exit_loop +); + + +/*! + * @brief unordered callback + * + * This handler is called to execute unordered actions (actions that need not + * to be executed in any particular order) attached to writeset. + * + * @param recv_ctx receiver context pointer provided by the application + * @param data data buffer containing the writeset + */ +typedef enum wsrep_cb_status (*wsrep_unordered_cb_t) ( + void* recv_ctx, + const wsrep_buf_t* data +); + + +/*! + * @brief a callback to donate state snapshot + * + * This handler is called from wsrep library when it needs this node + * to deliver state to a new cluster member. + * No state changes will be committed for the duration of this call. + * Wsrep implementation may provide internal state to be transmitted + * to new cluster member for initial state. + * + * @param app_ctx application context + * @param recv_ctx receiver context + * @param str_msg state transfer request message + * @param gtid current state ID on this node + * @param state current wsrep internal state buffer + * @param bypass bypass snapshot transfer, only transfer uuid:seqno pair + */ +typedef enum wsrep_cb_status (*wsrep_sst_donate_cb_t) ( + void* app_ctx, + void* recv_ctx, + const wsrep_buf_t* str_msg, + const wsrep_gtid_t* state_id, + const wsrep_buf_t* state, + wsrep_bool_t bypass +); + + +/*! + * @brief a callback to signal application that wsrep state is synced + * with cluster + * + * This callback is called after wsrep library has got in sync with + * rest of the cluster. + * + * @param app_ctx application context + * + * @return wsrep_cb_status enum + */ +typedef enum wsrep_cb_status (*wsrep_synced_cb_t) (void* app_ctx); + + +/* + * An opaque encryption key of arbitrary size - provided by the application + * May contain not only the key, but also algorithm specification and the like. + */ +typedef wsrep_buf_t wsrep_enc_key_t; + +/* + * Initialization vector/nonce. Given that most symmetric ciphers use 16 byte + * blocks this can be made 32 bytes without much loss of generality. + * Must be set by provider to start an encryption/decrytpion operation. + */ +typedef char wsrep_enc_iv_t[32]; + +/* + * Encryption context that should be sufficient to deterministically encrypt/ + * decrypt a data buffer either standalone or as part of a stream. May be used + * passed in apply_cb() along with the encrypted replication events to + * application as well. + * + * @param key [in] can be a pointer to const since provider will have to keep + * the keys until the last writeset that uses the key is in the + * cache + * @param iv [in] initialization vector for the beginning of the new + * operation. + * @param ctx [in/out] ongoing operation context + * To initialize a new context the encrypt_cb() caller sets it to + * NULL, which signals the encryption of a new continuous buffer. + * In that case the callback allocates the new context (using + * supplied key and iv) and stores the pointer to it for + * processing subsequent data. + * The end of the operation is signaled by passing TRUE in the + * parameter `last` to the encryption callback, the callback then + * finishes any pending encryption and deallocates the context. + */ +typedef struct +{ + const wsrep_enc_key_t* key; + const wsrep_enc_iv_t* iv; + void* ctx; +} +wsrep_enc_ctx_t; + +/* + * Encryption direction + */ +typedef enum +{ + WSREP_ENC = 0, /* encryption */ + WSREP_DEC = 1 /* decryption */ +} +wsrep_enc_direction_t; + +/* + * Encryption/decryption callback. Must be used by both provider and the + * application to obtain identical results. Can be NULL for no encryption. + * + * @param app_ctx application context + * @param enc_ctx current operation context + * @param input input data buffer + * @param output an output buffer, must be at least the size of the input + * data plus unwritten bytes from the previous call(s). E.g. in + * block mode, encryption/decryption operation will write data + * to output in multiples of the algoritm block size. So a call + * to encrypt a single byte won't normally write anything to + * output waiting for the next input chunk. So on the next call + * it may write one byte more than was given in the input. + * @param direction of the operation (encryption/decryption) + * @param last true if this is the last buffer to encrypt in the stream. + * In that case the callback shall write the remaining bytes of + * the stream to output (if any) and deallocate ctx->ctx if + * allocated previously + * + * @return a number of bytes written to output or a negative error code. + */ +typedef int (*wsrep_encrypt_cb_t) +( + void* app_ctx, + wsrep_enc_ctx_t* enc_ctx, + const wsrep_buf_t* input, + void* output, + wsrep_enc_direction_t direction, + bool last +); + + +/*! + * Initialization parameters for wsrep provider. + */ +struct wsrep_init_args +{ + void* app_ctx; //!< Application context for callbacks + + /* Configuration parameters */ + const char* node_name; //!< Symbolic name of this node (e.g. hostname) + const char* node_address; //!< Address to be used by wsrep provider + const char* node_incoming; //!< Address for incoming client connections + const char* data_dir; //!< Directory where wsrep files are kept if any + const char* options; //!< Provider-specific configuration string + int proto_ver; //!< Max supported application protocol version + + /* Application initial state information. */ + const wsrep_gtid_t* state_id; //!< Application state GTID + const wsrep_buf_t* state; //!< Initial state for wsrep provider + + /* Application callbacks */ + wsrep_log_cb_t logger_cb; //!< logging handler + wsrep_connected_cb_t connected_cb; //!< connected to group + wsrep_view_cb_t view_cb; //!< group view change handler + wsrep_sst_request_cb_t sst_request_cb; //!< SST request creator + wsrep_encrypt_cb_t encrypt_cb; //!< Encryption callback + + /* Applier callbacks */ + wsrep_apply_cb_t apply_cb; //!< apply callback + wsrep_unordered_cb_t unordered_cb; //!< callback for unordered actions + + /* State Snapshot Transfer callbacks */ + wsrep_sst_donate_cb_t sst_donate_cb; //!< donate SST + wsrep_synced_cb_t synced_cb; //!< synced with group +}; + + +/*! Type of the stats variable value in struct wsrep_status_var */ +typedef enum wsrep_var_type +{ + WSREP_VAR_STRING, //!< pointer to null-terminated string + WSREP_VAR_INT64, //!< int64_t + WSREP_VAR_DOUBLE //!< double +} +wsrep_var_type_t; + +/*! Generalized stats variable representation */ +struct wsrep_stats_var +{ + const char* name; //!< variable name + wsrep_var_type_t type; //!< variable value type + union { + int64_t _int64; + double _double; + const char* _string; + } value; //!< variable value +}; + + +/*! Key struct used to pass certification keys for transaction handling calls. + * A key consists of zero or more key parts. */ +typedef struct wsrep_key +{ + const wsrep_buf_t* key_parts; /*!< Array of key parts */ + size_t key_parts_num; /*!< Number of key parts */ +} wsrep_key_t; + +/*! Key type: + * SHARED - higher level resource shared between clients, e.g. SQL table + * REFERENCE - resource referenced but not modified, e.g. parent row + * UPDATE - resource is modified + * EXCLUSIVE - resource is either created or deleted */ +typedef enum wsrep_key_type +{ + WSREP_KEY_SHARED = 0, + WSREP_KEY_REFERENCE, + WSREP_KEY_UPDATE, + WSREP_KEY_EXCLUSIVE +} wsrep_key_type_t; + +/*! Data type: + * ORDERED state modification event that should be applied and committed + * in order. + * UNORDERED some action that does not modify state and execution of which is + * optional and does not need to happen in order. + * ANNOTATION (human readable) writeset annotation. */ +typedef enum wsrep_data_type +{ + WSREP_DATA_ORDERED = 0, + WSREP_DATA_UNORDERED, + WSREP_DATA_ANNOTATION +} wsrep_data_type_t; + + +/*! + * @brief Helper method to reset trx writeset handle state when trx id changes + * + * Instead of passing wsrep_ws_handle_t directly to wsrep calls, + * wrapping handle with this call offloads bookkeeping from + * application. + */ +static inline wsrep_ws_handle_t* wsrep_ws_handle_for_trx( + wsrep_ws_handle_t* ws_handle, + wsrep_trx_id_t trx_id) +{ + if (ws_handle->trx_id != trx_id) + { + ws_handle->trx_id = trx_id; + ws_handle->opaque = NULL; + } + return ws_handle; +} + + +/*! + * A handle for processing preordered actions. + * Must be initialized to WSREP_PO_INITIALIZER before use. + */ +typedef struct wsrep_po_handle { void* opaque; } wsrep_po_handle_t; + +static const wsrep_po_handle_t WSREP_PO_INITIALIZER = { NULL }; + + +typedef struct wsrep_st wsrep_t; +/*! + * wsrep interface for dynamically loadable libraries + */ +struct wsrep_st { + + const char *version; //!< interface version string + + /*! + * @brief Initializes wsrep provider + * + * @param wsrep provider handle + * @param args wsrep initialization parameters + */ + wsrep_status_t (*init) (wsrep_t* wsrep, + const struct wsrep_init_args* args); + + /*! + * @brief Returns provider capabilities bitmap + * + * Note that these are potential provider capabilities. Provider will + * offer only capabilities supported by all members in the view + * (see wsrep_view_info). + * + * @param wsrep provider handle + */ + wsrep_cap_t (*capabilities) (wsrep_t* wsrep); + + /*! + * @brief Passes provider-specific configuration string to provider. + * + * @param wsrep provider handle + * @param conf configuration string + * + * @retval WSREP_OK configuration string was parsed successfully + * @retval WSREP_WARNING could not parse configuration string, no action taken + */ + wsrep_status_t (*options_set) (wsrep_t* wsrep, const char* conf); + + /*! + * @brief Returns provider-specific string with current configuration values. + * + * @param wsrep provider handle + * + * @return a dynamically allocated string with current configuration + * parameter values + */ + char* (*options_get) (wsrep_t* wsrep); + + /*! + * @brief A call to set/rotate the key in provider. + * + * This may happen asynchronously and so is a best effort operation. + * Some buffers may still be encrypted with a previous key. + * + * @param a key object for the encryption callback + * + * return success or an error code + */ + wsrep_status_t (*enc_set_key)(wsrep_t* wsrep, const wsrep_enc_key_t* key); + + /*! + * @brief Opens connection to cluster + * + * Returns when either node is ready to operate as a part of the cluster + * or fails to reach operating status. + * + * @param wsrep provider handle + * @param cluster_name unique symbolic cluster name + * @param cluster_url URL-like cluster address (backend://address) + * @param state_donor name of the node to be asked for state transfer. + * @param bootstrap a flag to request initialization of a new wsrep + * service rather then a connection to the existing one. + * cluster_url may still carry important initialization + * parameters, like backend spec and/or listen address. + */ + wsrep_status_t (*connect) (wsrep_t* wsrep, + const char* cluster_name, + const char* cluster_url, + const char* state_donor, + wsrep_bool_t bootstrap); + + /*! + * @brief Closes connection to cluster. + * + * @param wsrep this wsrep handler + */ + wsrep_status_t (*disconnect)(wsrep_t* wsrep); + + /*! + * @brief start receiving replication events + * + * This function does not return until provider is closed or \p exit_loop + * parameter to wsrep_apply_cb_t() is set to true. + * + * @param wsrep provider handle + * @param recv_ctx receiver context + */ + wsrep_status_t (*recv)(wsrep_t* wsrep, void* recv_ctx); + + /*! + * @brief Tells provider that a given writeset has a read view associated + * with it. + * + * @param wsrep provider handle + * @param handle writeset handle + * @param rv read view GTID established by the caller or if NULL, + * provider will infer it internally. + */ + wsrep_status_t (*assign_read_view)(wsrep_t* wsrep, + wsrep_ws_handle_t* handle, + const wsrep_gtid_t* rv); + + /*! + * @brief Certifies transaction with provider. + * + * Must be called before transaction commit. Returns success code, which + * caller must check. + * + * In case of WSREP_OK, transaction can proceed to commit. + * Otherwise transaction must rollback. + * + * In case of a failure there are two conceptually different situations: + * - the writeset was not ordered. In that case meta struct shall contain + * undefined GTID: WSREP_UUID_UNDEFINED:WSREP_SEQNO_UNDEFINED. + * - the writeset was successfully ordered, but failed certification. + * In this case meta struct shall contain a valid GTID. + * + * Regardless of the return code, if meta struct contains a valid GTID + * the commit order critical section must be entered with that GTID. + * + * @param wsrep provider handle + * @param conn_id connection ID + * @param ws_handle writeset of committing transaction + * @param flags fine tuning the replication WSREP_FLAG_* + * @param meta transaction meta data + * + * @retval WSREP_OK writeset successfully certified, can commit + * @retval WSREP_TRX_FAIL must rollback transaction + * @retval WSREP_CONN_FAIL must close client connection + * @retval WSREP_NODE_FAIL must close all connections and reinit + */ + wsrep_status_t (*certify)(wsrep_t* wsrep, + wsrep_conn_id_t conn_id, + wsrep_ws_handle_t* ws_handle, + uint32_t flags, + wsrep_trx_meta_t* meta); + + /*! + * @brief Enters commit order critical section. + * + * Anything executed between this call and commit_order_leave() will be + * executed in provider enforced order. + * + * @param wsrep provider handle + * @param ws_handle internal provider writeset handle + * @param meta transaction meta data + * + * @retval WSREP_OK commit order entered successfully + * @retval WSREP_NODE_FAIL must close all connections and reinit + */ + wsrep_status_t (*commit_order_enter)(wsrep_t* wsrep, + const wsrep_ws_handle_t* ws_handle, + const wsrep_trx_meta_t* meta); + + /*! + * @brief Leaves commit order critical section + * + * Anything executed between commit_order_enter() and this call will be + * executed in provider enforced order. + * + * @param wsrep provider handle + * @param ws_handle internal provider writeset handle + * @param meta transaction meta data + * @param error buffer containing error info (null/empty for no error) + * + * @retval WSREP_OK commit order left successfully + * @retval WSREP_NODE_FAIL must close all connections and reinit + */ + wsrep_status_t (*commit_order_leave)(wsrep_t* wsrep, + const wsrep_ws_handle_t* ws_handle, + const wsrep_trx_meta_t* meta, + const wsrep_buf_t* error); + + /*! + * @brief Releases resources after transaction commit/rollback. + * + * Ends total order critical section. + * + * @param wsrep provider handle + * @param ws_handle writeset of committing transaction + * @retval WSREP_OK release succeeded + */ + wsrep_status_t (*release) (wsrep_t* wsrep, + wsrep_ws_handle_t* ws_handle); + + /*! + * @brief Replay trx as a slave writeset + * + * If local trx has been aborted by brute force, and it has already + * replicated before this abort, we must try if we can apply it as + * slave trx. Note that slave nodes see only trx writesets and certification + * test based on write set content can be different to DBMS lock conflicts. + * + * @param wsrep provider handle + * @param ws_handle writeset of committing transaction + * @param trx_ctx transaction context + * + * @retval WSREP_OK cluster commit succeeded + * @retval WSREP_TRX_FAIL must rollback transaction + * @retval WSREP_BF_ABORT brute force abort happened after trx replicated + * must rollback transaction and try to replay + * @retval WSREP_CONN_FAIL must close client connection + * @retval WSREP_NODE_FAIL must close all connections and reinit + */ + wsrep_status_t (*replay_trx)(wsrep_t* wsrep, + const wsrep_ws_handle_t* ws_handle, + void* trx_ctx); + + /*! + * @brief Abort certify() call of another thread. + * + * It is possible, that some high-priority transaction needs to abort + * another transaction which is in certify() call waiting for resources. + * + * The kill routine checks that abort is not attempted against a transaction + * which is front of the caller (in total order). + * + * If the abort was successful, the victim sequence number is stored + * into location pointed by the victim_seqno. + * + * @param wsrep provider handle + * @param bf_seqno seqno of brute force trx, running this cancel + * @param victim_trx transaction to be aborted, and which is committing + * @param victim_seqno seqno of the victim transaction if assigned + * + * @retval WSREP_OK abort succeeded + * @retval WSREP_NOT_ALLOWED the provider declined the abort request + * @retval WSREP_TRX_MISSING the victim_trx was missing + * @retval WSREP_WARNING abort failed + */ + wsrep_status_t (*abort_certification)(wsrep_t* wsrep, + wsrep_seqno_t bf_seqno, + wsrep_trx_id_t victim_trx, + wsrep_seqno_t* victim_seqno); + + /*! + * @brief Send a rollback fragment on behalf of trx + * + * @param wsrep provider handle + * @param trx transaction to be rolled back + * @param data data to append to the fragment + * + * @retval WSREP_OK rollback fragment sent successfully + */ + wsrep_status_t (*rollback)(wsrep_t* wsrep, + wsrep_trx_id_t trx, + const wsrep_buf_t* data); + + /*! + * @brief Appends a row reference to transaction writeset + * + * Both copy flag and key_type can be ignored by provider (key type + * interpreted as WSREP_KEY_EXCLUSIVE). + * + * @param wsrep provider handle + * @param ws_handle writeset handle + * @param keys array of keys + * @param count length of the array of keys + * @param type type of the key + * @param copy can be set to FALSE if keys persist through commit. + */ + wsrep_status_t (*append_key)(wsrep_t* wsrep, + wsrep_ws_handle_t* ws_handle, + const wsrep_key_t* keys, + size_t count, + enum wsrep_key_type type, + wsrep_bool_t copy); + + /*! + * @brief Appends data to transaction writeset + * + * This method can be called any time before certify() call and it appends + * a data buffer to the transaction writeset. + * Repeated calls of the method will result in direct buffer concatenation + * and all data will be passed as a single buffer to the apply callback. + * + * Both copy and unordered flags can be ignored by provider. + * + * @param wsrep provider handle + * @param ws_handle writeset handle + * @param data array of data buffers + * @param count buffer count + * @param type type of data + * @param copy can be set to FALSE if data persists through commit. + */ + wsrep_status_t (*append_data)(wsrep_t* wsrep, + wsrep_ws_handle_t* ws_handle, + const wsrep_buf_t* data, + size_t count, + enum wsrep_data_type type, + wsrep_bool_t copy); + + /*! + * @brief Blocks until the given GTID is committed + * + * This call will block the caller until the given GTID + * is guaranteed to be committed, or until a timeout occurs. + * The timeout value is given in parameter tout, if tout is -1, + * then the global causal read timeout applies. + * + * If no pointer upto is provided the call will block until + * causal ordering with all possible preceding writes in the + * cluster is guaranteed. + * + * If pointer to gtid is non-null, the call stores the global + * transaction ID of the last transaction which is guaranteed + * to be committed when the call returns. + * + * @param wsrep provider handle + * @param upto gtid to wait upto + * @param tout timeout in seconds + * -1 wait for global causal read timeout + * @param gtid location to store GTID + */ + wsrep_status_t (*sync_wait)(wsrep_t* wsrep, + wsrep_gtid_t* upto, + int tout, + wsrep_gtid_t* gtid); + + /*! + * @brief Returns the last committed gtid + * + * @param gtid location to store GTID + */ + wsrep_status_t (*last_committed_id)(wsrep_t* wsrep, + wsrep_gtid_t* gtid); + + /*! + * @brief Clears allocated connection context. + * + * Whenever a new connection ID is passed to wsrep provider through + * any of the API calls, a connection context is allocated for this + * connection. This call is to explicitly notify provider of connection + * closing. + * + * @param wsrep provider handle + * @param conn_id connection ID + * @param query the 'set database' query + * @param query_len length of query (does not end with 0) + */ + wsrep_status_t (*free_connection)(wsrep_t* wsrep, + wsrep_conn_id_t conn_id); + + /*! + * @brief Replicates a query and starts "total order isolation" section. + * + * Regular mode: + * + * Replicates the action spec and returns success code, which caller must + * check. Total order isolation continues until to_execute_end() is called. + * Regular "total order isolation" is achieved by calling to_execute_start() + * with WSREP_FLAG_TRX_START and WSREP_FLAG_TRX_END set. + * + * Two-phase mode: + * + * In this mode a query execution is split in two phases. The first phase is + * acquiring total order isolation to access critical section and the + * second phase is to release acquired resources in total order. + * + * To start the first phase the call is made with WSREP_FLAG_TRX_START set. + * The action is replicated and success code is returned. The total order + * isolation continues until to_execute_end() is called. However, the provider + * will keep the reference to the operation for conflict resolution purposes. + * + * The second phase is started with WSREP_FLAG_TRX_END set. Provider + * returns once it has achieved total ordering isolation for second phase. + * Total order isolation continues until to_execute_end() is called. + * All references to the operation are cleared by provider before + * call to to_execute_end() returns. + * + * @param wsrep provider handle + * @param conn_id connection ID + * @param keys array of keys + * @param keys_num length of the array of keys + * @param action action buffer array to be executed + * @param count action buffer count + * @param flags flags + * @param meta transaction meta data + * + * @retval WSREP_OK cluster commit succeeded + * @retval WSREP_CONN_FAIL must close client connection + * @retval WSREP_NODE_FAIL must close all connections and reinit + */ + wsrep_status_t (*to_execute_start)(wsrep_t* wsrep, + wsrep_conn_id_t conn_id, + const wsrep_key_t* keys, + size_t keys_num, + const wsrep_buf_t* action, + size_t count, + uint32_t flags, + wsrep_trx_meta_t* meta); + + /*! + * @brief Ends the total order isolation section. + * + * Marks the end of total order isolation. TO locks are freed + * and other transactions are free to commit from this point on. + * + * @param wsrep provider handle + * @param conn_id connection ID + * @param error error information about TOI operation (empty for no error) + * + * @retval WSREP_OK cluster commit succeeded + * @retval WSREP_CONN_FAIL must close client connection + * @retval WSREP_NODE_FAIL must close all connections and reinit + */ + wsrep_status_t (*to_execute_end)(wsrep_t* wsrep, + wsrep_conn_id_t conn_id, + const wsrep_buf_t* error); + + + /*! + * @brief Collects preordered replication events into a writeset. + * + * @param wsrep wsrep provider handle + * @param handle a handle associated with a given writeset + * @param data an array of data buffers. + * @param count length of data buffer array. + * @param copy whether provider needs to make a copy of events. + * + * @retval WSREP_OK cluster-wide commit succeeded + * @retval WSREP_TRX_FAIL operation failed (e.g. trx size exceeded limit) + * @retval WSREP_NODE_FAIL must close all connections and reinit + */ + wsrep_status_t (*preordered_collect) (wsrep_t* wsrep, + wsrep_po_handle_t* handle, + const wsrep_buf_t* data, + size_t count, + wsrep_bool_t copy); + + /*! + * @brief "Commits" preordered writeset to cluster. + * + * The contract is that the writeset will be committed in the same (partial) + * order this method was called. Frees resources associated with the writeset + * handle and reinitializes the handle. + * + * @param wsrep wsrep provider handle + * @param po_handle a handle associated with a given writeset + * @param source_id ID of the event producer, also serves as the partial order + * or stream ID - events with different source_ids won't be + * ordered with respect to each other. + * @param flags WSREP_FLAG_... flags + * @param pa_range the number of preceding events this event can be processed + * in parallel with. A value of 0 means strict serial + * processing. Note: commits always happen in wsrep order. + * @param commit 'true' to commit writeset to cluster (replicate) or + * 'false' to rollback (cancel) the writeset. + * + * @retval WSREP_OK cluster-wide commit succeeded + * @retval WSREP_TRX_FAIL operation failed (e.g. NON-PRIMARY component) + * @retval WSREP_NODE_FAIL must close all connections and reinit + */ + wsrep_status_t (*preordered_commit) (wsrep_t* wsrep, + wsrep_po_handle_t* handle, + const wsrep_uuid_t* source_id, + uint32_t flags, + int pa_range, + wsrep_bool_t commit); + + /*! + * @brief Signals to wsrep provider that state snapshot has been sent to + * joiner. + * + * @param wsrep provider handle + * @param state_id state ID + * @param rcode 0 or negative error code of the operation. + */ + wsrep_status_t (*sst_sent)(wsrep_t* wsrep, + const wsrep_gtid_t* state_id, + int rcode); + + /*! + * @brief Signals to wsrep provider that new state snapshot has been received. + * May deadlock if called from sst_prepare_cb. + * + * @param wsrep provider handle + * @param state_id state ID + * @param state initial state provided by SST donor + * @param rcode 0 or negative error code of the operation. + */ + wsrep_status_t (*sst_received)(wsrep_t* wsrep, + const wsrep_gtid_t* state_id, + const wsrep_buf_t* state, + int rcode); + + + /*! + * @brief Generate request for consistent snapshot. + * + * If successful, this call will generate internally SST request + * which in turn triggers calling SST donate callback on the nodes + * specified in donor_spec. If donor_spec is null, callback is + * called only locally. This call will block until sst_sent is called + * from callback. + * + * @param wsrep provider handle + * @param msg context message for SST donate callback + * @param msg_len length of context message + * @param donor_spec list of snapshot donors + */ + wsrep_status_t (*snapshot)(wsrep_t* wsrep, + const wsrep_buf_t* msg, + const char* donor_spec); + + /*! + * @brief Returns an array of status variables. + * Array is terminated by Null variable name. + * + * @param wsrep provider handle + * @return array of struct wsrep_status_var. + */ + struct wsrep_stats_var* (*stats_get) (wsrep_t* wsrep); + + /*! + * @brief Release resources that might be associated with the array. + * + * @param wsrep provider handle. + * @param var_array array returned by stats_get(). + */ + void (*stats_free) (wsrep_t* wsrep, struct wsrep_stats_var* var_array); + + /*! + * @brief Reset some stats variables to initial value, provider-dependent. + * + * @param wsrep provider handle. + */ + void (*stats_reset) (wsrep_t* wsrep); + + /*! + * @brief Pauses writeset applying/committing. + * + * @return global sequence number of the paused state or negative error code. + */ + wsrep_seqno_t (*pause) (wsrep_t* wsrep); + + /*! + * @brief Resumes writeset applying/committing. + */ + wsrep_status_t (*resume) (wsrep_t* wsrep); + + /*! + * @brief Desynchronize from cluster + * + * Effectively turns off flow control for this node, allowing it + * to fall behind the cluster. + */ + wsrep_status_t (*desync) (wsrep_t* wsrep); + + /*! + * @brief Request to resynchronize with cluster. + * + * Effectively turns on flow control. Asynchronous - actual synchronization + * event to be delivered via sync_cb. + */ + wsrep_status_t (*resync) (wsrep_t* wsrep); + + /*! + * @brief Acquire global named lock + * + * @param wsrep wsrep provider handle + * @param name lock name + * @param shared shared or exclusive lock + * @param owner 64-bit owner ID + * @param tout timeout in nanoseconds. + * 0 - return immediately, -1 wait forever. + * @return wsrep status or negative error code + * @retval -EDEADLK lock was already acquired by this thread + * @retval -EBUSY lock was busy + */ + wsrep_status_t (*lock) (wsrep_t* wsrep, + const char* name, wsrep_bool_t shared, + uint64_t owner, int64_t tout); + + /*! + * @brief Release global named lock + * + * @param wsrep wsrep provider handle + * @param name lock name + * @param owner 64-bit owner ID + * @return wsrep status or negative error code + * @retval -EPERM lock does not belong to this owner + */ + wsrep_status_t (*unlock) (wsrep_t* wsrep, const char* name, uint64_t owner); + + /*! + * @brief Check if global named lock is locked + * + * @param wsrep wsrep provider handle + * @param name lock name + * @param owner if not NULL will contain 64-bit owner ID + * @param node if not NULL will contain owner's node UUID + * @return true if lock is locked + */ + wsrep_bool_t (*is_locked) (wsrep_t* wsrep, const char* name, uint64_t* conn, + wsrep_uuid_t* node); + + /*! + * wsrep provider name + */ + const char* provider_name; + + /*! + * wsrep provider version + */ + const char* provider_version; + + /*! + * wsrep provider vendor name + */ + const char* provider_vendor; + + /*! + * @brief Frees allocated resources before unloading the library. + * @param wsrep provider handle + */ + void (*free)(wsrep_t* wsrep); + + void *dlh; //!< reserved for future use + void *ctx; //!< reserved for implementation private context +}; + + +/*! + * + * @brief Loads wsrep library + * + * @param spec path to wsrep library. If NULL or WSREP_NONE initializes dummy + * pass-through implementation. + * @param hptr wsrep handle + * @param log_cb callback to handle loader messages. Otherwise writes to stderr. + * + * @return zero on success, errno on failure + */ +int wsrep_load(const char* spec, wsrep_t** hptr, wsrep_log_cb_t log_cb); + +/*! + * @brief Unloads the wsrep library. The application must call + * wsrep->free() before unload to release library side resources. + * + * @param hptr wsrep handler pointer + */ +void wsrep_unload(wsrep_t* hptr); + +#ifdef __cplusplus +} +#endif + +#endif /* WSREP_H */ diff --git a/wsrep-lib/wsrep-API/v26/wsrep_dummy.c b/wsrep-lib/wsrep-API/v26/wsrep_dummy.c new file mode 100644 index 00000000..7bd441dc --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/wsrep_dummy.c @@ -0,0 +1,462 @@ +/* Copyright (C) 2009-2010 Codership Oy <info@codersihp.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/*! @file Dummy wsrep API implementation. */ + +#include "wsrep_api.h" + +#include <errno.h> +#include <stdbool.h> +#include <string.h> + +/*! Dummy backend context. */ +typedef struct wsrep_dummy +{ + wsrep_log_cb_t log_fn; + char* options; +} wsrep_dummy_t; + +/* Get pointer to wsrep_dummy context from wsrep_t pointer */ +#define WSREP_DUMMY(_p) ((wsrep_dummy_t *) (_p)->ctx) + +/* Trace function usage a-la DBUG */ +#define WSREP_DBUG_ENTER(_w) do { \ + if (WSREP_DUMMY(_w)) { \ + if (WSREP_DUMMY(_w)->log_fn) \ + WSREP_DUMMY(_w)->log_fn(WSREP_LOG_DEBUG, __FUNCTION__); \ + } \ + } while (0) + + +static void dummy_free(wsrep_t *w) +{ + if (!w->ctx) return; + + WSREP_DBUG_ENTER(w); + if (WSREP_DUMMY(w)->options) { + free(WSREP_DUMMY(w)->options); + WSREP_DUMMY(w)->options = NULL; + } + free(w->ctx); + w->ctx = NULL; +} + +static wsrep_status_t dummy_init (wsrep_t* w, + const struct wsrep_init_args* args) +{ + WSREP_DUMMY(w)->log_fn = args->logger_cb; + WSREP_DBUG_ENTER(w); + if (args->options) { + WSREP_DUMMY(w)->options = strdup(args->options); + } + return WSREP_OK; +} + +static wsrep_cap_t dummy_capabilities (wsrep_t* w __attribute__((unused))) +{ + return 0; +} + +static wsrep_status_t dummy_options_set( + wsrep_t* w, + const char* conf) +{ + WSREP_DBUG_ENTER(w); + if (WSREP_DUMMY(w)->options) { + free(WSREP_DUMMY(w)->options); + WSREP_DUMMY(w)->options = NULL; + } + if (conf) { + WSREP_DUMMY(w)->options = strdup(conf); + } + return WSREP_OK; +} + +static char* dummy_options_get (wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); + return strdup(WSREP_DUMMY(w)->options); +} + +static wsrep_status_t dummy_enc_set_key( + wsrep_t* w, + const wsrep_enc_key_t* key __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_connect( + wsrep_t* w, + const char* name __attribute__((unused)), + const char* url __attribute__((unused)), + const char* donor __attribute__((unused)), + wsrep_bool_t bootstrap __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_disconnect(wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_recv(wsrep_t* w, + void* recv_ctx __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_assign_read_view( + wsrep_t* w, + wsrep_ws_handle_t* ws_handle __attribute__((unused)), + const wsrep_gtid_t* rv __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_certify( + wsrep_t* w, + const wsrep_conn_id_t conn_id __attribute__((unused)), + wsrep_ws_handle_t* ws_handle __attribute__((unused)), + uint32_t flags __attribute__((unused)), + wsrep_trx_meta_t* meta __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_commit_order_enter( + wsrep_t* w, + const wsrep_ws_handle_t* ws_handle __attribute__((unused)), + const wsrep_trx_meta_t* meta __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_commit_order_leave( + wsrep_t* w, + const wsrep_ws_handle_t* ws_handle __attribute__((unused)), + const wsrep_trx_meta_t* meta __attribute__((unused)), + const wsrep_buf_t* error __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_release( + wsrep_t* w, + wsrep_ws_handle_t* ws_handle __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_replay_trx( + wsrep_t* w, + const wsrep_ws_handle_t* ws_handle __attribute__((unused)), + void* trx_ctx __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_abort_certification( + wsrep_t* w, + const wsrep_seqno_t bf_seqno __attribute__((unused)), + const wsrep_trx_id_t trx_id __attribute__((unused)), + wsrep_seqno_t *victim_seqno __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_rollback( + wsrep_t* w, + const wsrep_trx_id_t trx __attribute__((unused)), + const wsrep_buf_t* data __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_append_key( + wsrep_t* w, + wsrep_ws_handle_t* ws_handle __attribute__((unused)), + const wsrep_key_t* key __attribute__((unused)), + const size_t key_num __attribute__((unused)), + const wsrep_key_type_t key_type __attribute__((unused)), + const bool copy __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_append_data( + wsrep_t* w, + wsrep_ws_handle_t* ws_handle __attribute__((unused)), + const struct wsrep_buf* data __attribute__((unused)), + const size_t count __attribute__((unused)), + const wsrep_data_type_t type __attribute__((unused)), + const bool copy __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_sync_wait( + wsrep_t* w, + wsrep_gtid_t* upto __attribute__((unused)), + int tout __attribute__((unused)), + wsrep_gtid_t* gtid __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_last_committed_id( + wsrep_t* w, + wsrep_gtid_t* gtid __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_free_connection( + wsrep_t* w, + const wsrep_conn_id_t conn_id __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_to_execute_start( + wsrep_t* w, + const wsrep_conn_id_t conn_id __attribute__((unused)), + const wsrep_key_t* key __attribute__((unused)), + const size_t key_num __attribute__((unused)), + const struct wsrep_buf* data __attribute__((unused)), + const size_t count __attribute__((unused)), + const uint32_t flags __attribute__((unused)), + wsrep_trx_meta_t* meta __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_to_execute_end( + wsrep_t* w, + const wsrep_conn_id_t conn_id __attribute__((unused)), + const wsrep_buf_t* err __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_preordered_collect( + wsrep_t* w, + wsrep_po_handle_t* handle __attribute__((unused)), + const struct wsrep_buf* data __attribute__((unused)), + size_t count __attribute__((unused)), + wsrep_bool_t copy __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_preordered_commit( + wsrep_t* w, + wsrep_po_handle_t* handle __attribute__((unused)), + const wsrep_uuid_t* source_id __attribute__((unused)), + uint32_t flags __attribute__((unused)), + int pa_range __attribute__((unused)), + wsrep_bool_t commit __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_sst_sent( + wsrep_t* w, + const wsrep_gtid_t* state_id __attribute__((unused)), + const int rcode __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_sst_received( + wsrep_t* w, + const wsrep_gtid_t* state_id __attribute__((unused)), + const wsrep_buf_t* state __attribute__((unused)), + const int rcode __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_snapshot( + wsrep_t* w, + const wsrep_buf_t* msg __attribute__((unused)), + const char* donor_spec __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static struct wsrep_stats_var dummy_stats[] = { + { NULL, WSREP_VAR_STRING, { 0 } } +}; + +static struct wsrep_stats_var* dummy_stats_get (wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); + return dummy_stats; +} + +static void dummy_stats_free ( + wsrep_t* w, + struct wsrep_stats_var* stats __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); +} + +static void dummy_stats_reset (wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); +} + +static wsrep_seqno_t dummy_pause (wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); + return -ENOSYS; +} + +static wsrep_status_t dummy_resume (wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_desync (wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); + return WSREP_NOT_IMPLEMENTED; +} + +static wsrep_status_t dummy_resync (wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_lock (wsrep_t* w, + const char* s __attribute__((unused)), + bool r __attribute__((unused)), + uint64_t o __attribute__((unused)), + int64_t t __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_NOT_IMPLEMENTED; +} + +static wsrep_status_t dummy_unlock (wsrep_t* w, + const char* s __attribute__((unused)), + uint64_t o __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static bool dummy_is_locked (wsrep_t* w, + const char* s __attribute__((unused)), + uint64_t* o __attribute__((unused)), + wsrep_uuid_t* t __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return false; +} + +static wsrep_t dummy_iface = { + WSREP_INTERFACE_VERSION, + &dummy_init, + &dummy_capabilities, + &dummy_options_set, + &dummy_options_get, + &dummy_enc_set_key, + &dummy_connect, + &dummy_disconnect, + &dummy_recv, + &dummy_assign_read_view, + &dummy_certify, + &dummy_commit_order_enter, + &dummy_commit_order_leave, + &dummy_release, + &dummy_replay_trx, + &dummy_abort_certification, + &dummy_rollback, + &dummy_append_key, + &dummy_append_data, + &dummy_sync_wait, + &dummy_last_committed_id, + &dummy_free_connection, + &dummy_to_execute_start, + &dummy_to_execute_end, + &dummy_preordered_collect, + &dummy_preordered_commit, + &dummy_sst_sent, + &dummy_sst_received, + &dummy_snapshot, + &dummy_stats_get, + &dummy_stats_free, + &dummy_stats_reset, + &dummy_pause, + &dummy_resume, + &dummy_desync, + &dummy_resync, + &dummy_lock, + &dummy_unlock, + &dummy_is_locked, + WSREP_NONE, + WSREP_INTERFACE_VERSION, + "Codership Oy <info@codership.com>", + &dummy_free, + NULL, + NULL +}; + +int wsrep_dummy_loader(wsrep_t* w) +{ + if (!w) + return EINVAL; + + *w = dummy_iface; + + // allocate private context + if (!(w->ctx = malloc(sizeof(wsrep_dummy_t)))) + return ENOMEM; + + // initialize private context + WSREP_DUMMY(w)->log_fn = NULL; + WSREP_DUMMY(w)->options = NULL; + + return 0; +} diff --git a/wsrep-lib/wsrep-API/v26/wsrep_event_service.h b/wsrep-lib/wsrep-API/v26/wsrep_event_service.h new file mode 100644 index 00000000..2d0d16ea --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/wsrep_event_service.h @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2021 Codership Oy <info@codership.com> + * + * This file is part of wsrep-API. + * + * Wsrep-API is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-API is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-API. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file wsrep_event_service.h + * + * This file defines interface for various unordered events generated by the + * cluster or the provider. + * + * An event has a name and a payload, both are null-terminated strings. + * (It is intended that payload is a JSON encoded structure). + * The name serves to distinguish the events to pass them to respective + * handlers. + * + * The provider which is capable of using the service interface v1 must + * export the following functions. + * + * int wsrep_init_event_service_v1(wsrep_event_service_v1_t*) + * void wsrep_deinit_event_service_v1() + * + * which can be probed by the application. + * + * The application must initialize the service via above init function + * before the provider is initialized via wsrep->init(). The deinit + * function must be called after the provider side resources have been + * released via wsrep->free(). + */ + +#ifndef WSREP_EVENT_SERVICE_H +#define WSREP_EVENT_SERVICE_H + +#include "wsrep_api.h" + +#include <sys/types.h> /* posix size_t */ + +#ifdef __cplusplus +extern "C" +{ +#endif /* __cplusplus */ + +/** + * Type tag for application defined event processing context. + * + * Application may pass pointer to the context when initializing + * the event service. This pointer is passed a first parameter for + * each service call. + */ +typedef struct wsrep_event_context wsrep_event_context_t; + + +/** + * Process an event + * + * @param name event name + * @param value JSON enconded event value + * + * @return void, it is up to the receiver to decide what to do about + * possible error. + */ +typedef void (*wsrep_event_cb_t)(wsrep_event_context_t*, + const char* name, const char* value); + + +/** + * Event service struct. + * + * A pointer to this struct must be passed to the call to + * wsrep_init_event_service_v1. + * + * The application must provide implementation to all functions defined + * in this struct. + */ +typedef struct wsrep_event_service_v1_st +{ + /* Event receiver callback */ + wsrep_event_cb_t event_cb; + /* Pointer to application defined event context. */ + wsrep_event_context_t* context; +} wsrep_event_service_v1_t; + +#ifdef __cplusplus +} +#endif /* __cplusplus */ + + +#define WSREP_EVENT_SERVICE_INIT_FUNC_V1 "wsrep_init_event_service_v1" +#define WSREP_EVENT_SERVICE_DEINIT_FUNC_V1 "wsrep_deinit_event_service_v1" + +#endif /* WSREP_EVENT_SERVICE_H */ + diff --git a/wsrep-lib/wsrep-API/v26/wsrep_gtid.c b/wsrep-lib/wsrep-API/v26/wsrep_gtid.c new file mode 100644 index 00000000..2b6dfb90 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/wsrep_gtid.c @@ -0,0 +1,77 @@ +/* Copyright (C) 2013 Codership Oy <info@codersihp.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/*! @file Helper functions to deal with GTID string representations */ + +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <inttypes.h> +#include <limits.h> + +#include "wsrep_api.h" + +/*! + * Read GTID from string + * @return length of GTID string representation or -EINVAL in case of error + */ +int +wsrep_gtid_scan(const char* str, size_t str_len, wsrep_gtid_t* gtid) +{ + int offset; + char* endptr; + if (str_len > INT_MAX) return -EINVAL; + + if ((offset = wsrep_uuid_scan(str, str_len, >id->uuid)) > 0 && + offset < (int)str_len && str[offset] == ':') { + ++offset; + if (offset < (int)str_len) + { + errno = 0; + gtid->seqno = strtoll(str + offset, &endptr, 0); + + if (errno == 0) { + return (int)(endptr - str); + } + } + } + *gtid = WSREP_GTID_UNDEFINED; + return -EINVAL; +} + +/*! + * Write GTID to string + * @return length of GTID string representation or -EMSGSIZE if string is too + * short + */ +int +wsrep_gtid_print(const wsrep_gtid_t* gtid, char* str, size_t str_len) +{ + int offset, ret; + if (str_len > INT_MAX) return -EINVAL; + + if ((offset = wsrep_uuid_print(>id->uuid, str, str_len)) > 0) + { + ret = snprintf(str + offset, (size_t)((int)str_len - offset), + ":%" PRId64, gtid->seqno); + if (ret <= ((int)str_len - offset)) { + return (offset + ret); + } + + } + + return -EMSGSIZE; +} diff --git a/wsrep-lib/wsrep-API/v26/wsrep_loader.c b/wsrep-lib/wsrep-API/v26/wsrep_loader.c new file mode 100644 index 00000000..cfa1984f --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/wsrep_loader.c @@ -0,0 +1,239 @@ +/* Copyright (C) 2009-2011 Codership Oy <info@codersihp.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/*! @file wsrep implementation loader */ + +#include <dlfcn.h> +#include <errno.h> +#include <string.h> +#include <stdio.h> + +#include "wsrep_api.h" + +// Logging stuff for the loader +static const char* log_levels[] = {"FATAL", "ERROR", "WARN", "INFO", "DEBUG"}; + +static void default_logger (wsrep_log_level_t lvl, const char* msg) +{ + fprintf (stderr, "wsrep loader: [%s] %s\n", log_levels[lvl], msg); +} + +static wsrep_log_cb_t logger = default_logger; + +/************************************************************************** + * Library loader + **************************************************************************/ + +static int wsrep_check_iface_version(const char* found, const char* iface_ver) +{ + const size_t msg_len = 128; + char msg[128]; + + if (strcmp(found, iface_ver)) { + snprintf (msg, msg_len, + "provider interface version mismatch: need '%s', found '%s'", + iface_ver, found); + logger (WSREP_LOG_ERROR, msg); + return EINVAL; + } + + return 0; +} + +static int verify(const wsrep_t *wh, const char *iface_ver) +{ + char msg[128]; + const size_t msg_len = sizeof(msg); + +#define VERIFY(_p) if (!(_p)) { \ + snprintf(msg, msg_len, "wsrep_load(): verify(): %s\n", # _p); \ + logger (WSREP_LOG_ERROR, msg); \ + return EINVAL; \ + } + + VERIFY(wh); + VERIFY(wh->version); + + if (wsrep_check_iface_version(wh->version, iface_ver)) + return EINVAL; + + VERIFY(wh->init); + VERIFY(wh->options_set); + VERIFY(wh->options_get); + VERIFY(wh->enc_set_key); + VERIFY(wh->connect); + VERIFY(wh->disconnect); + VERIFY(wh->recv); + VERIFY(wh->assign_read_view); + VERIFY(wh->certify); + VERIFY(wh->commit_order_enter); + VERIFY(wh->commit_order_leave); + VERIFY(wh->release); + VERIFY(wh->replay_trx); + VERIFY(wh->abort_certification); + VERIFY(wh->append_key); + VERIFY(wh->append_data); + VERIFY(wh->free_connection); + VERIFY(wh->to_execute_start); + VERIFY(wh->to_execute_end); + VERIFY(wh->preordered_collect); + VERIFY(wh->preordered_commit); + VERIFY(wh->sst_sent); + VERIFY(wh->sst_received); + VERIFY(wh->stats_get); + VERIFY(wh->stats_free); + VERIFY(wh->stats_reset); + VERIFY(wh->pause); + VERIFY(wh->resume); + VERIFY(wh->desync); + VERIFY(wh->resync); + VERIFY(wh->lock); + VERIFY(wh->unlock); + VERIFY(wh->is_locked); + VERIFY(wh->provider_name); + VERIFY(wh->provider_version); + VERIFY(wh->provider_vendor); + VERIFY(wh->free); + return 0; +} + +typedef int (*wsrep_loader_fun)(wsrep_t*); + +static wsrep_loader_fun wsrep_dlf(void *dlh, const char *sym) +{ + union { + wsrep_loader_fun dlfun; + void *obj; + } alias; + alias.obj = dlsym(dlh, sym); + return alias.dlfun; +} + +static int wsrep_check_version_symbol(void *dlh) +{ + char** dlversion = NULL; + dlversion = (char**) dlsym(dlh, "wsrep_interface_version"); + if (dlversion == NULL) + return 0; + return wsrep_check_iface_version(*dlversion, WSREP_INTERFACE_VERSION); +} + +extern int wsrep_dummy_loader(wsrep_t *w); + +int wsrep_load(const char *spec, wsrep_t **hptr, wsrep_log_cb_t log_cb) +{ + int ret = 0; + void *dlh = NULL; + wsrep_loader_fun dlfun; + char msg[1024]; + const size_t msg_len = sizeof(msg) - 1; + msg[msg_len] = 0; + + if (NULL != log_cb) + logger = log_cb; + + if (!(spec && hptr)) + return EINVAL; + + snprintf (msg, msg_len, + "wsrep_load(): loading provider library '%s'", spec); + logger (WSREP_LOG_INFO, msg); + + if (!(*hptr = malloc(sizeof(wsrep_t)))) { + logger (WSREP_LOG_FATAL, "wsrep_load(): out of memory"); + return ENOMEM; + } + + if (!spec || strcmp(spec, WSREP_NONE) == 0) { + if ((ret = wsrep_dummy_loader(*hptr)) != 0) { + free (*hptr); + *hptr = NULL; + } + return ret; + } + + if (!(dlh = dlopen(spec, RTLD_NOW | RTLD_LOCAL))) { + snprintf(msg, msg_len, "wsrep_load(): dlopen(): %s", dlerror()); + logger (WSREP_LOG_ERROR, msg); + ret = EINVAL; + goto out; + } + + if (!(dlfun = wsrep_dlf(dlh, "wsrep_loader"))) { + ret = EINVAL; + goto out; + } + + if (wsrep_check_version_symbol(dlh) != 0) { + ret = EINVAL; + goto out; + } + + if ((ret = (*dlfun)(*hptr)) != 0) { + snprintf(msg, msg_len, "wsrep_load(): loader failed: %s", + strerror(ret)); + logger (WSREP_LOG_ERROR, msg); + goto out; + } + + if ((ret = verify(*hptr, WSREP_INTERFACE_VERSION)) != 0) { + snprintf (msg, msg_len, + "wsrep_load(): interface version mismatch: my version %s, " + "provider version %s", WSREP_INTERFACE_VERSION, + (*hptr)->version); + logger (WSREP_LOG_ERROR, msg); + goto out; + } + + (*hptr)->dlh = dlh; + +out: + if (ret != 0) { + if (dlh) dlclose(dlh); + free(*hptr); + *hptr = NULL; + } else { + snprintf (msg, msg_len, + "wsrep_load(): %s %s by %s loaded successfully.", + (*hptr)->provider_name, (*hptr)->provider_version, + (*hptr)->provider_vendor); + logger (WSREP_LOG_INFO, msg); + } + + return ret; +} + +void wsrep_unload(wsrep_t *hptr) +{ + if (!hptr) { + logger (WSREP_LOG_WARN, "wsrep_unload(): null pointer."); + } else { + if (hptr->free) + hptr->free(hptr); + if (hptr->dlh) + { + int err; + if ((err = dlclose(hptr->dlh))) + { + char msg[1024]; + snprintf(msg, sizeof(msg), "dlclose(): %s", dlerror()); + msg[sizeof(msg) - 1] = '\0'; + logger(WSREP_LOG_WARN, msg); + } + } + free(hptr); + } +} diff --git a/wsrep-lib/wsrep-API/v26/wsrep_membership_service.h b/wsrep-lib/wsrep-API/v26/wsrep_membership_service.h new file mode 100644 index 00000000..b6053a2f --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/wsrep_membership_service.h @@ -0,0 +1,138 @@ +/* + * Copyright (C) 2020 Codership Oy <info@codership.com> + * + * This file is part of wsrep-API. + * + * Wsrep-API is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-API is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-API. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file wsrep_membership_service.h + * + * This file defines interface for quering the immediate membership and + * members' states of the current configuration. The information is provided + * OUT OF ORDER to facilitate administrative tasks. + * + * The provider which is capable of using the service interface v1 must + * export the following functions. + * + * int wsrep_init_membership_service_v1(struct wsrep_membership_service_v1*) + * void wsrep_deinit_membership_service_v1() + * + * which can be probed by the application. + * + * The application must initialize the service via above init function + * before the provider is initialized via wsrep->init(). The deinit + * function must be called after the provider side resources have been + * released via wsrep->free(). + */ + +#ifndef WSREP_MEMBERSHIP_SERVICE_H +#define WSREP_MEMBERSHIP_SERVICE_H + +#include "wsrep_api.h" + +#ifdef __cplusplus +extern "C" +{ +#endif /* __cplusplus */ + +/** + * Member info structure extended to contain member state + */ +struct wsrep_member_info_ext +{ + struct wsrep_member_info base; + wsrep_seqno_t last_committed; + enum wsrep_member_status status; +}; + +/** + * Extended membership structure + */ +struct wsrep_membership +{ + /** + * Epoch of the membership data (last time it was updated) + */ + wsrep_uuid_t group_uuid; + /** + * Sequence number of the last received (not processed) action + */ + wsrep_seqno_t last_received; + /** + * When the members' data was last updated + */ + wsrep_seqno_t updated; + /** + * Current group state + */ + enum wsrep_view_status state; + /** + * Number of members in the array + */ + size_t num; + /** + * Membership array + */ + struct wsrep_member_info_ext members[1]; +}; + +/** + * Memory allocation callback for wsrep_get_mmebership_fn() below + * + * @param size of buffer to allocate + * @return allocated buffer pointer or NULL in case of error + */ +typedef void* (*wsrep_allocator_cb) (size_t size); + +/** + * Query membership + * + * @param wsrep provider handle + * @param allocator to use for wsrep_membership struct allocation + * @param membership pointer to pointer to the memebrship structure. + * The structure is allocated by provider and must be freed + * by the caller. + * @return error code of the call + */ +typedef wsrep_status_t (*wsrep_get_membership_fn) ( + wsrep_t* wsrep, + wsrep_allocator_cb allocator, + struct wsrep_membership** membership); + +/** + * Membership service struct. + * Returned by WSREP_MEMBERSHIP_SERVICE_INIT_FUNC_V1 + */ +struct wsrep_membership_service_v1 +{ + wsrep_get_membership_fn get_membership; +}; + +#ifdef __cplusplus +} +#endif /* __cplusplus */ + +typedef +wsrep_status_t +(*wsrep_membership_service_v1_init_fn) (struct wsrep_membership_service_v1*); +typedef +void +(*wsrep_membership_service_v1_deinit_fn)(void); + +/** must be exported by the provider */ +#define WSREP_MEMBERSHIP_SERVICE_V1_INIT_FN "wsrep_init_membership_service_v1" +#define WSREP_MEMBERSHIP_SERVICE_V1_DEINIT_FN "wsrep_deinit_membership_service_v1" + +#endif /* WSREP_MEMBERSHIP_SERVICE_H */ diff --git a/wsrep-lib/wsrep-API/v26/wsrep_thread_service.h b/wsrep-lib/wsrep-API/v26/wsrep_thread_service.h new file mode 100644 index 00000000..956751aa --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/wsrep_thread_service.h @@ -0,0 +1,355 @@ +/* + * Copyright (C) 2019 Codership Oy <info@codership.com> + * + * This file is part of wsrep-API. + * + * Wsrep-API is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-API is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-API. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file wsrep_thread_service.h + * + * Service interface for threads, mutexes and condition variables. + * The application which may provide callbacks to routines which will + * be used to manage lifetime and use threads and sycnronization primitives. + * + * The type tags and interface methods are loosely modeled after POSIX + * threading interface. + * + * The application must either none or all of the callbacks defined in + * wsrep_thread_service structure which is defined below. + * + * The error codes returned by the callbacks are generally assumed to + * the system error numbers defined in errno.h unless stated otherwise. + * + * The provider must implement and export the following functions + * to provide initialization point for the service implementation: + * + * Version 1: + * int wsrep_init_thread_service_v1(wsrep_thread_service_v1_t*) + * void wsrep_deinit_thread_service_v1(). + * + * The application defined implementation must be initialized before + * calling the provider initialization function via wsrep->init(). The + * deinitialization must be done via deinit function after the + * provider side resources have been released via wsrep->free(). + */ + +#ifndef WSREP_THREAD_SERVICE_H +#define WSREP_THREAD_SERVICE_H + +#include <stddef.h> /* size_t */ + +#ifdef __cplusplus +extern "C" +{ +#endif /* __cplusplus */ + + /* Forward declarations */ + struct timespec; + struct sched_param; + + /** Thread type tags */ + typedef struct wsrep_thread_key_st wsrep_thread_key_t; + typedef struct wsrep_thread_st wsrep_thread_t; + /** Mutex type tags */ + typedef struct wsrep_mutex_key_st wsrep_mutex_key_t; + typedef struct wsrep_mutex_st wsrep_mutex_t; + /** Condition variable tags */ + typedef struct wsrep_cond_key_st wsrep_cond_key_t; + typedef struct wsrep_cond_st wsrep_cond_t; + + /** + * Create key for a thread with a name. This key object will be passed + * to thread creation and destrunction notification callbacks. + * + * @param name Name of the thread. + */ + typedef const wsrep_thread_key_t* (*wsrep_thread_key_create_cb_t)( + const char* name); + + /** + * Create a new thread. + * + * @param[out] thread Newly allocated thread. + * @param key Key created by wsrep_thread_key_create_cb_t + * @param start_fn Pointer to start routine + * @param arg Argument for start_fn + * + * @return Zero in case of success, non-zero error code in case of failure. + */ + typedef int (*wsrep_thread_create_cb_t)(const wsrep_thread_key_t* key, + wsrep_thread_t** thread, + void* (*start_fn)(void*), + void* arg); + + /** + * Detach a thread. + * + * @param thread Thread to be detached. + * + * @return Zero in case of error, non-zero error code in case of failure. + */ + typedef int (*wsrep_thread_detach_cb_t)(wsrep_thread_t* thread); + + /** + * Compare two threads for equality. + * + * @params t1, t2 Threads to be compared. + * + * @return Non-zero value if threads are equal, zero otherwise. + */ + typedef int (*wsrep_thread_equal_cb_t)(wsrep_thread_t* t1, + wsrep_thread_t* t2); + + /** + * Terminate the calling thread. + * + * @param thread Pointer to thread. + * @param retval Pointer to return value. + * + * This function does not return. + */ + typedef void __attribute__((noreturn)) (*wsrep_thread_exit_cb_t)( + wsrep_thread_t* thread, void* retval); + + /** + * Join a thread. Trying to join detached thread may cause undefined + * behavior. + * + * @param thread Thread to be joined. + * @param[out] retval Return value from the thread wthat was joined. + * + * @return Zero in case of success, non-zero error code in case of error. + */ + typedef int (*wsrep_thread_join_cb_t)(wsrep_thread_t* thread, + void** retval); + + /** + * Return a pointer to the wsrep_thread_t of the calling thread. + * + * @return Pointer to wsrep_thread_t associated with current thread. + */ + typedef wsrep_thread_t* (*wsrep_thread_self_cb_t)(void); + + /** + * Set the scheduling policy for the thread. + * + * @param thread Thread for which sceduing policy should be changed. + * @param policy New scheduling policy for the thread. + * @param param New scheduling parameters for the thread. + */ + typedef int (*wsrep_thread_setschedparam_cb_t)( + wsrep_thread_t* thread, int policy, const struct sched_param* param); + + /** + * Get the current scheduling policy for the thread. + * + * @param thread Thread. + * @param policy Pointer to location where the scheduling policy will + * will be stored in. + * @param Param Pointer to location where the current scheduling + * parameters will be stored. + */ + typedef int (*wsrep_thread_getschedparam_cb_t)(wsrep_thread_t* thread, + int* policy, + struct sched_param* param); + /** + * Create key for a mutex with a name. This key object must be passed + * to mutex creation callback. + * + * @param name Name of the mutex. + * + * @return Const pointer to mutex key. + */ + typedef const wsrep_mutex_key_t* (*wsrep_mutex_key_create_cb_t)( + const char* name); + + /** + * Create a mutex. + * + * @param key Mutex key obtained via wsrep_mutex_key_create_cb call. + * @param memblock Optional memory block allocated by the provider + * which can be used by the implementation to store + * the mutex. + * @param memblock_size Size of the optional memory block. + * + * @return Pointer to wsrep_mutex_t object or NULL in case of failure. + */ + typedef wsrep_mutex_t* (*wsrep_mutex_init_cb_t)( + const wsrep_mutex_key_t* key, void* memblock, size_t memblock_size); + + /** + * Destroy a mutex. This call must consume the mutex object. + * + * @param mutex Mutex to be destroyed. + */ + typedef int (*wsrep_mutex_destroy_cb_t)(wsrep_mutex_t* mutex); + + /** + * Lock a mutex. + * + * @param mutex Mutex to be locked. + * + * @return Zero on success, non-zero error code on error. + */ + typedef int (*wsrep_mutex_lock_cb_t)(wsrep_mutex_t* mutex); + + /** + * Try to lock a mutex. + * + * @param Mutex to be locked. + * + * @return Zero if mutex was successfully locked. + * @return EBUSY if the mutex could not be acquired because it was already + * locked. + * @return Non-zero error code on any other error. + */ + typedef int (*wsrep_mutex_trylock_cb_t)(wsrep_mutex_t* mutex); + + /** + * Unlock a mutex. + * + * @param mutex Mutex to be unlocked. + * + * @return Zero on success, non-zero on error. + */ + typedef int (*wsrep_mutex_unlock_cb_t)(wsrep_mutex_t* mutex); + + /** + * Create key for a condition variable with a name. This key + * must be passed to wsrep_cond_create_cb when creating a new + * condition variable. + * + * @param name Name of the condition variable. + * + * @return Allocated key object. + */ + typedef const wsrep_cond_key_t* (*wsrep_cond_key_create_cb_t)( + const char* name); + + /** + * Create a new condition variable. + * + * @param key Const pointer to key object created by + * wsrep_cond_key_create_cb. + * @param memblock Optional memory block allocated by the provider + * which can be used by the implementation to store + * the mutex. + * @param memblock_size Size of the optional memory block. + * + * @return Pointer to new condition variable. + */ + typedef wsrep_cond_t* (*wsrep_cond_init_cb_t)(const wsrep_cond_key_t* key, + void* memblock, + size_t memblock_size); + + /** + * Destroy a condition variable. This call must consume the condition + * variable object. + * + * @param cond Condition variable to be destroyed. + * + * @return Zero on success, non-zero on error. + */ + typedef int (*wsrep_cond_destroy_cb_t)(wsrep_cond_t* cond); + + /** + * Wait for condition. + * + * @param cond Condition variable to wait for. + * @param mutex Mutex associated to the condition variable. The mutex + * may be unlocked for the duration of the wait. + * + * @return Zero on success, non-zero on error. + */ + typedef int (*wsrep_cond_wait_cb_t)(wsrep_cond_t* cond, + wsrep_mutex_t* mutex); + + /** + * Perform timed wait on condition. + * + * @param cond Condition to wait for. + * @param mutex Mutex associated to the condition variable. The mutex + * may be unlocked for the duration of the wait. + * @param wait_until System time to wait until before returning from the + * the timed wait. + * + * @return Zero on success. + * @return ETIMEDOUT if the time specified by wait_until has passed. + * @return Non-zero error code on other error. + */ + typedef int (*wsrep_cond_timedwait_cb_t)(wsrep_cond_t* cond, + wsrep_mutex_t* mutex, + const struct timespec* wait_until); + + /** + * Signal a condition variable. This will wake up at least one of + * the threads which is waiting for the condition. + * + * @param cond Condition variable to signal. + * + * @return Zero on success, non-zero on failure. + */ + typedef int (*wsrep_cond_signal_cb_t)(wsrep_cond_t* cond); + + /** + * Broadcast a signal to condition variable. This will wake up + * all the threads which are currently waiting on condition variable. + * + * @param cond Condition variable to broadcast the signal to. + * + * @return Zero on success, non-zero on failure. + */ + typedef int (*wsrep_cond_broadcast_cb_t)(wsrep_cond_t* cond); + + typedef struct wsrep_thread_service_v1_st + { + /* Threads */ + wsrep_thread_key_create_cb_t thread_key_create_cb; + wsrep_thread_create_cb_t thread_create_cb; + wsrep_thread_detach_cb_t thread_detach_cb; + wsrep_thread_equal_cb_t thread_equal_cb; + wsrep_thread_exit_cb_t thread_exit_cb; + wsrep_thread_join_cb_t thread_join_cb; + wsrep_thread_self_cb_t thread_self_cb; + wsrep_thread_setschedparam_cb_t thread_setschedparam_cb; + wsrep_thread_getschedparam_cb_t thread_getschedparam_cb; + /* Mutexes */ + wsrep_mutex_key_create_cb_t mutex_key_create_cb; + wsrep_mutex_init_cb_t mutex_init_cb; + wsrep_mutex_destroy_cb_t mutex_destroy_cb; + wsrep_mutex_lock_cb_t mutex_lock_cb; + wsrep_mutex_trylock_cb_t mutex_trylock_cb; + wsrep_mutex_unlock_cb_t mutex_unlock_cb; + /* Condition variables */ + wsrep_cond_key_create_cb_t cond_key_create_cb; + wsrep_cond_init_cb_t cond_init_cb; + wsrep_cond_destroy_cb_t cond_destroy_cb; + wsrep_cond_wait_cb_t cond_wait_cb; + wsrep_cond_timedwait_cb_t cond_timedwait_cb; + wsrep_cond_signal_cb_t cond_signal_cb; + wsrep_cond_broadcast_cb_t cond_broadcast_cb; + } wsrep_thread_service_v1_t; + +#ifdef __cplusplus +} + +#define WSREP_THREAD_SERVICE_INIT_FUNC_V1 "wsrep_init_thread_service_v1" +#define WSREP_THREAD_SERVICE_DEINIT_FUNC_V1 "wsrep_deinit_thread_service_v1" + +/* For backwards compatibility. */ +#define WSREP_THREAD_SERVICE_INIT_FUNC WSREP_THREAD_SERVICE_INIT_FUNC_V1 + +#endif /* __cplusplus */ +#endif /* WSREP_THREAD_SERVICE_H */ diff --git a/wsrep-lib/wsrep-API/v26/wsrep_tls_service.h b/wsrep-lib/wsrep-API/v26/wsrep_tls_service.h new file mode 100644 index 00000000..c632f4aa --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/wsrep_tls_service.h @@ -0,0 +1,326 @@ +/* + * Copyright (C) 2020 Codership Oy <info@codership.com> + * + * This file is part of wsrep-API. + * + * Wsrep-API is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * Wsrep-API is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with wsrep-API. If not, see <https://www.gnu.org/licenses/>. + */ + +/** @file wsrep_tls_service.h + * + * This file defines interface for TLS services provided by the application, + * used by the provider. + * + * In order to support both synchronous and asynchronous IO operations, + * the interface is designed to work with sockets in both blocking + * and non-blockig mode. + * + * The provider is in charge of opening and closing file + * descriptors and connecting transport. After the connection has + * been established, all further IO operations will be delegated + * to the TLS service implementation which is provided by the application. + * + * The provider which is capable of using the service interface v1 must + * export the following functions. + * + * int wsrep_init_tls_service_v1(wsrep_tls_service_v1_t*) + * void wsrep_deinit_tls_service_v1() + * + * which can be probed by the application. + * + * The application must initialize the service via above init function + * before the provider is initialized via wsrep->init(). The deinit + * function must be called after the provider side resources have been + * released via wsrep->free(). + */ + +#ifndef WSREP_TLS_SERVICE_H +#define WSREP_TLS_SERVICE_H + +#include <sys/types.h> /* posix size_t */ + +#ifdef __cplusplus +extern "C" +{ +#endif /* __cplusplus */ + +/** + * Type tag for application defined TLS context. + * + * Application may pass pointer to the context when initializing + * TLS service. This pointer is passed a first parameter for + * each service call. + */ +typedef struct wsrep_tls_context wsrep_tls_context_t; + +/** + * TLS stream structure. + */ +typedef struct wsrep_tls_stream_st +{ + /** + * File descriptor corresponding to the stream. The provider is + * responsible in opening and closing the socket. + */ + int fd; + /** + * Opaque pointer reserved for application use. + */ + void* opaque; +} wsrep_tls_stream_t; + +/** + * Enumeration for return codes. + */ +enum wsrep_tls_result +{ + /** + * The operation completed successfully, no further actions + * are necessary. + */ + wsrep_tls_result_success = 0, + /** + * The operation completed successfully, but the application side wants + * to make further reads. The provider must wait until the stream + * becomes readable and then try the same operation again. + */ + wsrep_tls_result_want_read, + /** + * The operation completed successfully, but the application side wants + * to make further writes. The provider must wait until the stream + * becomes writable and then try the same operation again. + */ + wsrep_tls_result_want_write, + /** + * End of file was read from the stream. This result is needed to + * make difference between graceful stream shutdown and zero length + * reads which result from errors. + */ + wsrep_tls_result_eof, + /** + * An error occurred. The specific error reason must be + * queried with wsrep_tls_stream_get_error_number and + * wsrep_tls_stream_get_error_category. + */ + wsrep_tls_result_error +}; + +/** + * Initialize a new TLS stream. + * + * Initialize the stream for IO operations. During this call the + * application must set up all of the data structures needed for + * IO, but must not do any reads or writes into the stream yet. + * + * @param stream TLS stream to be initialized. + * + * @return Zero on success, system error number on error. + */ +typedef int (*wsrep_tls_stream_init_t)(wsrep_tls_context_t*, + wsrep_tls_stream_t* stream); + +/** + * Deinitialize the TLS stream. + * + * Deinitialize the TLS stream and free all allocated resources. + * Note that this function must not close the socket file descriptor + * associated the the stream. + * + * @param stream Stream to be deinitialized. + */ +typedef void (*wsrep_tls_stream_deinit_t)(wsrep_tls_context_t*, + wsrep_tls_stream_t* stream); + +/** + * Get error number of the last stream error. The error numbers are + * defined by the application and must be integral type. By the convention + * zero value must denote success. + * + * For managing errors other than system errors, the application may + * provide several error categories via wsrep_tls_stream_get_error_category_t. + * + * @param stream TLS stream to get the last error from. + * + * @return Error number. + */ +typedef int (*wsrep_tls_stream_get_error_number_t)( + wsrep_tls_context_t*, + const wsrep_tls_stream_t* stream); + +/** + * Get the error category of the last stream error. + * + * The category is represented via a const void pointer to the provider. + * If the category is NULL pointer, the error number is assumed to be + * system error. + * + * @param stream Stream to get last error category from. + * + * @return Pointer to error category. + */ +typedef const void* (*wsrep_tls_stream_get_error_category_t)( + wsrep_tls_context_t*, + const wsrep_tls_stream_t* stream); + +/** + * Return human readable error message by error number and error + * category. + * + * The message string returned by the application must contain only + * printable characters and must be null terminated. + * + * @param error_number Error number returned by + * wsrep_tls_stream_get_error_number_t. + * @param category Error category returned by + * wsrep_tls_stream_get_error_category_t. + * + * @return Human readable message string. + */ +typedef const char* (*wsrep_tls_error_message_get_t)( + wsrep_tls_context_t*, + const wsrep_tls_stream_t* stream, + int error_number, const void* category); + +/** + * Initiate TLS client side handshake. This function is called for the + * stream sockets which have been connected by the provider. + * + * If the stream socket is in non-blocking mode, the call should return + * immediately with appropriate result indicating if more actions are needed + * in the case the operation would block. The provider will call this function + * again until either a success or an error is returned. + * + * @param stream TLS stream. + * + * @return Enum wsrep_tls_result. + */ +typedef enum wsrep_tls_result (*wsrep_tls_stream_client_handshake_t)( + wsrep_tls_context_t*, + wsrep_tls_stream_t* stream); + +/** + * Initiate TLS server side handshake. This function is called for stream + * sockets which have been accepted by the provider. + * + * If the stream socket is in non-blocking mode, the call should return + * immediately with appropriate result indicating if more actions are needed + * in the case the operation would block. The provider will call this function + * again until either a success or an error is returned. + * + * @param stream TLS stream. + * + * @return Enum wsrep_tls_result. + */ +typedef enum wsrep_tls_result (*wsrep_tls_stream_server_handshake_t)( + wsrep_tls_context_t*, + wsrep_tls_stream_t* stream); + +/** + * Perform a read from the stream. If the file descriptor associated + * to the stream is in non-blocking mode, the call must return immediately + * with appropriate result if the stream processing would block. + * + * @param[in] stream TLS stream. + * @param[in] buf Buffer to read the data into. + * @param[in] max_count Maximum number of bytes to read. + * @param[out] bytes_transferred Number of bytes read into the buffer during + * the operation. + * + * @return Enum wsrep_tls_result. + */ +typedef enum wsrep_tls_result (*wsrep_tls_stream_read_t)( + wsrep_tls_context_t*, + wsrep_tls_stream_t* stream, + void* buf, + size_t max_count, + size_t* bytes_transferred); + +/** + * Perform a write to the stream. If the file descriptor asociated to + * te stream is in non-blocking mode, the call must return immediately + * with appropriate result if the stream processing would block. + * + * @param[in] stream TLS stream. + * @param[in] buf Buffer which contains the data to write. + * @param[in] count Number of bytes to be written. + * @param[out] bytes_transferred Number of bytes written into the stream + * during the opration. + * + * @return Enum wsrep_tls_result. + */ +typedef enum wsrep_tls_result (*wsrep_tls_stream_write_t)( + wsrep_tls_context_t*, + wsrep_tls_stream_t* stream, + const void* buf, + size_t count, + size_t* bytes_transferred); + +/** + * Shutdown the TLS stream. + * + * Note that the implementation must not close the associated stream + * socket, just shut down the protocol. + * + * If the shutdown call returns either wsrep_result_want_read or + * wsrep_result_want_write, the provider must wait until the socket + * becomes readable or writable and then call the function again + * until the return status is either success or an error occurs. + * + * @param stream TLS stream to be shut down. + * + * @return Enum wsrep_tls_result code. + * + */ +typedef enum wsrep_tls_result (*wsrep_tls_stream_shutdown_t)( + wsrep_tls_context_t*, + wsrep_tls_stream_t* stream); + +/** + * TLS service struct. + * + * A pointer to this struct must be passed to the call to + * wsrep_init_tls_service_v1. + * + * The application must provide implementation to all functions defined + * in this struct. + */ +typedef struct wsrep_tls_service_v1_st +{ + /* Stream */ + wsrep_tls_stream_init_t stream_init; + wsrep_tls_stream_deinit_t stream_deinit; + wsrep_tls_stream_get_error_number_t stream_get_error_number; + wsrep_tls_stream_get_error_category_t stream_get_error_category; + wsrep_tls_stream_client_handshake_t stream_client_handshake; + wsrep_tls_stream_server_handshake_t stream_server_handshake; + wsrep_tls_stream_read_t stream_read; + wsrep_tls_stream_write_t stream_write; + wsrep_tls_stream_shutdown_t stream_shutdown; + /* Error */ + wsrep_tls_error_message_get_t error_message_get; + /* Pointer to application defined TLS context. */ + wsrep_tls_context_t* context; +} wsrep_tls_service_v1_t; + + +#ifdef __cplusplus +} +#endif /* __cplusplus */ + + +#define WSREP_TLS_SERVICE_INIT_FUNC_V1 "wsrep_init_tls_service_v1" +#define WSREP_TLS_SERVICE_DEINIT_FUNC_V1 "wsrep_deinit_tls_service_v1" + +#endif /* WSREP_TLS_SERVICE_H */ + diff --git a/wsrep-lib/wsrep-API/v26/wsrep_uuid.c b/wsrep-lib/wsrep-API/v26/wsrep_uuid.c new file mode 100644 index 00000000..3ac2ca91 --- /dev/null +++ b/wsrep-lib/wsrep-API/v26/wsrep_uuid.c @@ -0,0 +1,94 @@ +/* Copyright (C) 2009 Codership Oy <info@codersihp.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/*! @file Helper functions to deal with history UUID string representations */ + +#include <errno.h> +#include <ctype.h> +#include <stdio.h> +#include <string.h> + +#include "wsrep_api.h" + +/*! + * Read UUID from string + * @return length of UUID string representation or -EINVAL in case of error + */ +int +wsrep_uuid_scan (const char* str, size_t str_len, wsrep_uuid_t* uuid) +{ + unsigned int uuid_len = 0; + unsigned int uuid_offt = 0; + + while (uuid_len + 1 < str_len) { + /* We are skipping potential '-' after uuid_offt == 4, 6, 8, 10 + * which means + * (uuid_offt >> 1) == 2, 3, 4, 5, + * which in turn means + * (uuid_offt >> 1) - 2 <= 3 + * since it is always >= 0, because uuid_offt is unsigned */ + if (((uuid_offt >> 1) - 2) <= 3 && str[uuid_len] == '-') { + // skip dashes after 4th, 6th, 8th and 10th positions + uuid_len += 1; + continue; + } + + if (isxdigit(str[uuid_len]) && isxdigit(str[uuid_len + 1])) { + // got hex digit, scan another byte to uuid, increment uuid_offt + sscanf (str + uuid_len, "%2hhx", uuid->data + uuid_offt); + uuid_len += 2; + uuid_offt += 1; + if (sizeof (uuid->data) == uuid_offt) + return (int)uuid_len; + } + else { + break; + } + } + + *uuid = WSREP_UUID_UNDEFINED; + return -EINVAL; +} + +/*! + * Write UUID to string + * @return length of UUID string representation or -EMSGSIZE if string is too + * short + */ +int +wsrep_uuid_print (const wsrep_uuid_t* uuid, char* str, size_t str_len) +{ + if (str_len > 36) { + const unsigned char* u = uuid->data; + return snprintf(str, str_len, "%02x%02x%02x%02x-%02x%02x-%02x%02x-" + "%02x%02x-%02x%02x%02x%02x%02x%02x", + u[ 0], u[ 1], u[ 2], u[ 3], u[ 4], u[ 5], u[ 6], u[ 7], + u[ 8], u[ 9], u[10], u[11], u[12], u[13], u[14], u[15]); + } + else { + return -EMSGSIZE; + } +} + +/*! + * Compare two UUIDs + * @return -1, 0, 1 if lhs is respectively smaller, equal, or greater than rhs + */ +int +wsrep_uuid_compare (const wsrep_uuid_t* lhs, const wsrep_uuid_t* rhs) +{ + return memcmp(lhs, rhs, sizeof(wsrep_uuid_t)); +} |