summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2020-04-19 14:56:41 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2020-04-19 14:56:41 +0000
commit7f743e3be67463888d4a929b782e7ce5f57592ac (patch)
tree649e99e34a15cece1c603b0d7dbea2dad219ac57
parentInitial commit. (diff)
downloadnetdata-go-orchestrator-7f743e3be67463888d4a929b782e7ce5f57592ac.tar.xz
netdata-go-orchestrator-7f743e3be67463888d4a929b782e7ce5f57592ac.zip
Adding upstream version 0+20200312.upstream/0+20200312upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
-rw-r--r--.circleci/config.yml49
-rw-r--r--.gitignore1
-rw-r--r--LICENSE674
-rw-r--r--README.md159
-rw-r--r--cli/cli.go39
-rw-r--r--create.go221
-rw-r--r--create_test.go236
-rw-r--r--examples/config/module.conf66
-rw-r--r--examples/config/plugin.conf15
-rw-r--r--examples/simple/main.go85
-rw-r--r--go.mod11
-rw-r--r--go.sum19
-rw-r--r--logger/countwatcher.go79
-rw-r--r--logger/countwatcher_test.go57
-rw-r--r--logger/formatter.go198
-rw-r--r--logger/formatter_test.go32
-rw-r--r--logger/logger.go200
-rw-r--r--logger/logger_test.go212
-rw-r--r--logger/severity.go53
-rw-r--r--logger/static.go99
-rw-r--r--mock.go94
-rw-r--r--mock_test.go87
-rw-r--r--module/charts.go442
-rw-r--r--module/charts_test.go374
-rw-r--r--module/job.go423
-rw-r--r--module/job_test.go311
-rw-r--r--module/mock.go53
-rw-r--r--module/mock_test.go52
-rw-r--r--module/module.go37
-rw-r--r--module/netdataapi.go89
-rw-r--r--module/netdataapi_test.go151
-rw-r--r--module/registry.go37
-rw-r--r--module/registry_test.go32
-rw-r--r--orchestrator.go224
-rw-r--r--orchestrator_test.go184
-rw-r--r--pkg/multipath/multipath.go57
-rw-r--r--pkg/multipath/multipath_test.go37
-rw-r--r--pkg/multipath/tests/test-empty.conf0
-rw-r--r--pkg/multipath/tests/test.conf1
-rw-r--r--save.go132
-rw-r--r--save_test.go25
-rw-r--r--setup.go97
-rw-r--r--setup_test.go160
-rw-r--r--start.go69
-rw-r--r--start_test.go27
-rw-r--r--testdata/god-jobs-statuses.json5
-rw-r--r--testdata/test.d.conf-broken.yml8
-rw-r--r--testdata/test.d.conf-disabled.yml6
-rw-r--r--testdata/test.d.conf-empty.yml1
-rw-r--r--testdata/test.d.conf-invalid-modules.yml6
-rw-r--r--testdata/test.d.conf.yml6
-rw-r--r--testdata/test.d/module-broken.conf8
-rw-r--r--testdata/test.d/module-no-jobs.conf4
-rw-r--r--testdata/test.d/module1.conf7
-rw-r--r--testdata/test.d/module2.conf6
-rw-r--r--ticker.go53
-rw-r--r--ticker_test.go47
57 files changed, 5857 insertions, 0 deletions
diff --git a/.circleci/config.yml b/.circleci/config.yml
new file mode 100644
index 0000000..474d956
--- /dev/null
+++ b/.circleci/config.yml
@@ -0,0 +1,49 @@
+version: 2
+jobs:
+ compile:
+ docker:
+ - image: circleci/golang:1.13
+ steps:
+ - checkout
+ - restore_cache:
+ keys:
+ - go_mod-{{ checksum "go.mod" }}-{{ checksum "go.sum" }}
+ - run: go get -t -v -d ./...
+ - save_cache:
+ key: go_mod-{{ checksum "go.mod" }}-{{ checksum "go.sum" }}
+ paths:
+ - /go/pkg/mod
+ - run: CGO_ENABLED=0 go build -o /tmp/godplugin github.com/netdata/go-orchestrator/examples/simple
+ - run: /tmp/godplugin --help || true
+ - store_artifacts:
+ path: /tmp/godplugin
+ vet:
+ docker:
+ - image: circleci/golang:1.13
+ steps:
+ - checkout
+ - restore_cache:
+ keys:
+ - go_mod-{{ checksum "go.mod" }}-{{ checksum "go.sum" }}
+ - run: go vet ./...
+ test:
+ docker:
+ - image: circleci/golang:1.13
+ steps:
+ - checkout
+ - restore_cache:
+ keys:
+ - go_mod-{{ checksum "go.mod" }}-{{ checksum "go.sum" }}
+ - run: go test ./... -coverprofile=coverage.txt -race -cover -covermode=atomic
+
+workflows:
+ version: 2
+ build_and_test:
+ jobs:
+ - compile
+ - vet:
+ requires:
+ - compile
+ - test:
+ requires:
+ - compile
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..f56da36
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+/examples/simple/simple \ No newline at end of file
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..f288702
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,674 @@
+ GNU GENERAL PUBLIC LICENSE
+ Version 3, 29 June 2007
+
+ Copyright (C) 2007 Free Software Foundation, Inc. <https://fsf.org/>
+ Everyone is permitted to copy and distribute verbatim copies
+ of this license document, but changing it is not allowed.
+
+ Preamble
+
+ The GNU General Public License is a free, copyleft license for
+software and other kinds of works.
+
+ The licenses for most software and other practical works are designed
+to take away your freedom to share and change the works. By contrast,
+the GNU General Public License is intended to guarantee your freedom to
+share and change all versions of a program--to make sure it remains free
+software for all its users. We, the Free Software Foundation, use the
+GNU General Public License for most of our software; it applies also to
+any other work released this way by its authors. You can apply it to
+your programs, too.
+
+ When we speak of free software, we are referring to freedom, not
+price. Our General Public Licenses are designed to make sure that you
+have the freedom to distribute copies of free software (and charge for
+them if you wish), that you receive source code or can get it if you
+want it, that you can change the software or use pieces of it in new
+free programs, and that you know you can do these things.
+
+ To protect your rights, we need to prevent others from denying you
+these rights or asking you to surrender the rights. Therefore, you have
+certain responsibilities if you distribute copies of the software, or if
+you modify it: responsibilities to respect the freedom of others.
+
+ For example, if you distribute copies of such a program, whether
+gratis or for a fee, you must pass on to the recipients the same
+freedoms that you received. You must make sure that they, too, receive
+or can get the source code. And you must show them these terms so they
+know their rights.
+
+ Developers that use the GNU GPL protect your rights with two steps:
+(1) assert copyright on the software, and (2) offer you this License
+giving you legal permission to copy, distribute and/or modify it.
+
+ For the developers' and authors' protection, the GPL clearly explains
+that there is no warranty for this free software. For both users' and
+authors' sake, the GPL requires that modified versions be marked as
+changed, so that their problems will not be attributed erroneously to
+authors of previous versions.
+
+ Some devices are designed to deny users access to install or run
+modified versions of the software inside them, although the manufacturer
+can do so. This is fundamentally incompatible with the aim of
+protecting users' freedom to change the software. The systematic
+pattern of such abuse occurs in the area of products for individuals to
+use, which is precisely where it is most unacceptable. Therefore, we
+have designed this version of the GPL to prohibit the practice for those
+products. If such problems arise substantially in other domains, we
+stand ready to extend this provision to those domains in future versions
+of the GPL, as needed to protect the freedom of users.
+
+ Finally, every program is threatened constantly by software patents.
+States should not allow patents to restrict development and use of
+software on general-purpose computers, but in those that do, we wish to
+avoid the special danger that patents applied to a free program could
+make it effectively proprietary. To prevent this, the GPL assures that
+patents cannot be used to render the program non-free.
+
+ The precise terms and conditions for copying, distribution and
+modification follow.
+
+ TERMS AND CONDITIONS
+
+ 0. Definitions.
+
+ "This License" refers to version 3 of the GNU General Public License.
+
+ "Copyright" also means copyright-like laws that apply to other kinds of
+works, such as semiconductor masks.
+
+ "The Program" refers to any copyrightable work licensed under this
+License. Each licensee is addressed as "you". "Licensees" and
+"recipients" may be individuals or organizations.
+
+ To "modify" a work means to copy from or adapt all or part of the work
+in a fashion requiring copyright permission, other than the making of an
+exact copy. The resulting work is called a "modified version" of the
+earlier work or a work "based on" the earlier work.
+
+ A "covered work" means either the unmodified Program or a work based
+on the Program.
+
+ To "propagate" a work means to do anything with it that, without
+permission, would make you directly or secondarily liable for
+infringement under applicable copyright law, except executing it on a
+computer or modifying a private copy. Propagation includes copying,
+distribution (with or without modification), making available to the
+public, and in some countries other activities as well.
+
+ To "convey" a work means any kind of propagation that enables other
+parties to make or receive copies. Mere interaction with a user through
+a computer network, with no transfer of a copy, is not conveying.
+
+ An interactive user interface displays "Appropriate Legal Notices"
+to the extent that it includes a convenient and prominently visible
+feature that (1) displays an appropriate copyright notice, and (2)
+tells the user that there is no warranty for the work (except to the
+extent that warranties are provided), that licensees may convey the
+work under this License, and how to view a copy of this License. If
+the interface presents a list of user commands or options, such as a
+menu, a prominent item in the list meets this criterion.
+
+ 1. Source Code.
+
+ The "source code" for a work means the preferred form of the work
+for making modifications to it. "Object code" means any non-source
+form of a work.
+
+ A "Standard Interface" means an interface that either is an official
+standard defined by a recognized standards body, or, in the case of
+interfaces specified for a particular programming language, one that
+is widely used among developers working in that language.
+
+ The "System Libraries" of an executable work include anything, other
+than the work as a whole, that (a) is included in the normal form of
+packaging a Major Component, but which is not part of that Major
+Component, and (b) serves only to enable use of the work with that
+Major Component, or to implement a Standard Interface for which an
+implementation is available to the public in source code form. A
+"Major Component", in this context, means a major essential component
+(kernel, window system, and so on) of the specific operating system
+(if any) on which the executable work runs, or a compiler used to
+produce the work, or an object code interpreter used to run it.
+
+ The "Corresponding Source" for a work in object code form means all
+the source code needed to generate, install, and (for an executable
+work) run the object code and to modify the work, including scripts to
+control those activities. However, it does not include the work's
+System Libraries, or general-purpose tools or generally available free
+programs which are used unmodified in performing those activities but
+which are not part of the work. For example, Corresponding Source
+includes interface definition files associated with source files for
+the work, and the source code for shared libraries and dynamically
+linked subprograms that the work is specifically designed to require,
+such as by intimate data communication or control flow between those
+subprograms and other parts of the work.
+
+ The Corresponding Source need not include anything that users
+can regenerate automatically from other parts of the Corresponding
+Source.
+
+ The Corresponding Source for a work in source code form is that
+same work.
+
+ 2. Basic Permissions.
+
+ All rights granted under this License are granted for the term of
+copyright on the Program, and are irrevocable provided the stated
+conditions are met. This License explicitly affirms your unlimited
+permission to run the unmodified Program. The output from running a
+covered work is covered by this License only if the output, given its
+content, constitutes a covered work. This License acknowledges your
+rights of fair use or other equivalent, as provided by copyright law.
+
+ You may make, run and propagate covered works that you do not
+convey, without conditions so long as your license otherwise remains
+in force. You may convey covered works to others for the sole purpose
+of having them make modifications exclusively for you, or provide you
+with facilities for running those works, provided that you comply with
+the terms of this License in conveying all material for which you do
+not control copyright. Those thus making or running the covered works
+for you must do so exclusively on your behalf, under your direction
+and control, on terms that prohibit them from making any copies of
+your copyrighted material outside their relationship with you.
+
+ Conveying under any other circumstances is permitted solely under
+the conditions stated below. Sublicensing is not allowed; section 10
+makes it unnecessary.
+
+ 3. Protecting Users' Legal Rights From Anti-Circumvention Law.
+
+ No covered work shall be deemed part of an effective technological
+measure under any applicable law fulfilling obligations under article
+11 of the WIPO copyright treaty adopted on 20 December 1996, or
+similar laws prohibiting or restricting circumvention of such
+measures.
+
+ When you convey a covered work, you waive any legal power to forbid
+circumvention of technological measures to the extent such circumvention
+is effected by exercising rights under this License with respect to
+the covered work, and you disclaim any intention to limit operation or
+modification of the work as a means of enforcing, against the work's
+users, your or third parties' legal rights to forbid circumvention of
+technological measures.
+
+ 4. Conveying Verbatim Copies.
+
+ You may convey verbatim copies of the Program's source code as you
+receive it, in any medium, provided that you conspicuously and
+appropriately publish on each copy an appropriate copyright notice;
+keep intact all notices stating that this License and any
+non-permissive terms added in accord with section 7 apply to the code;
+keep intact all notices of the absence of any warranty; and give all
+recipients a copy of this License along with the Program.
+
+ You may charge any price or no price for each copy that you convey,
+and you may offer support or warranty protection for a fee.
+
+ 5. Conveying Modified Source Versions.
+
+ You may convey a work based on the Program, or the modifications to
+produce it from the Program, in the form of source code under the
+terms of section 4, provided that you also meet all of these conditions:
+
+ a) The work must carry prominent notices stating that you modified
+ it, and giving a relevant date.
+
+ b) The work must carry prominent notices stating that it is
+ released under this License and any conditions added under section
+ 7. This requirement modifies the requirement in section 4 to
+ "keep intact all notices".
+
+ c) You must license the entire work, as a whole, under this
+ License to anyone who comes into possession of a copy. This
+ License will therefore apply, along with any applicable section 7
+ additional terms, to the whole of the work, and all its parts,
+ regardless of how they are packaged. This License gives no
+ permission to license the work in any other way, but it does not
+ invalidate such permission if you have separately received it.
+
+ d) If the work has interactive user interfaces, each must display
+ Appropriate Legal Notices; however, if the Program has interactive
+ interfaces that do not display Appropriate Legal Notices, your
+ work need not make them do so.
+
+ A compilation of a covered work with other separate and independent
+works, which are not by their nature extensions of the covered work,
+and which are not combined with it such as to form a larger program,
+in or on a volume of a storage or distribution medium, is called an
+"aggregate" if the compilation and its resulting copyright are not
+used to limit the access or legal rights of the compilation's users
+beyond what the individual works permit. Inclusion of a covered work
+in an aggregate does not cause this License to apply to the other
+parts of the aggregate.
+
+ 6. Conveying Non-Source Forms.
+
+ You may convey a covered work in object code form under the terms
+of sections 4 and 5, provided that you also convey the
+machine-readable Corresponding Source under the terms of this License,
+in one of these ways:
+
+ a) Convey the object code in, or embodied in, a physical product
+ (including a physical distribution medium), accompanied by the
+ Corresponding Source fixed on a durable physical medium
+ customarily used for software interchange.
+
+ b) Convey the object code in, or embodied in, a physical product
+ (including a physical distribution medium), accompanied by a
+ written offer, valid for at least three years and valid for as
+ long as you offer spare parts or customer support for that product
+ model, to give anyone who possesses the object code either (1) a
+ copy of the Corresponding Source for all the software in the
+ product that is covered by this License, on a durable physical
+ medium customarily used for software interchange, for a price no
+ more than your reasonable cost of physically performing this
+ conveying of source, or (2) access to copy the
+ Corresponding Source from a network server at no charge.
+
+ c) Convey individual copies of the object code with a copy of the
+ written offer to provide the Corresponding Source. This
+ alternative is allowed only occasionally and noncommercially, and
+ only if you received the object code with such an offer, in accord
+ with subsection 6b.
+
+ d) Convey the object code by offering access from a designated
+ place (gratis or for a charge), and offer equivalent access to the
+ Corresponding Source in the same way through the same place at no
+ further charge. You need not require recipients to copy the
+ Corresponding Source along with the object code. If the place to
+ copy the object code is a network server, the Corresponding Source
+ may be on a different server (operated by you or a third party)
+ that supports equivalent copying facilities, provided you maintain
+ clear directions next to the object code saying where to find the
+ Corresponding Source. Regardless of what server hosts the
+ Corresponding Source, you remain obligated to ensure that it is
+ available for as long as needed to satisfy these requirements.
+
+ e) Convey the object code using peer-to-peer transmission, provided
+ you inform other peers where the object code and Corresponding
+ Source of the work are being offered to the general public at no
+ charge under subsection 6d.
+
+ A separable portion of the object code, whose source code is excluded
+from the Corresponding Source as a System Library, need not be
+included in conveying the object code work.
+
+ A "User Product" is either (1) a "consumer product", which means any
+tangible personal property which is normally used for personal, family,
+or household purposes, or (2) anything designed or sold for incorporation
+into a dwelling. In determining whether a product is a consumer product,
+doubtful cases shall be resolved in favor of coverage. For a particular
+product received by a particular user, "normally used" refers to a
+typical or common use of that class of product, regardless of the status
+of the particular user or of the way in which the particular user
+actually uses, or expects or is expected to use, the product. A product
+is a consumer product regardless of whether the product has substantial
+commercial, industrial or non-consumer uses, unless such uses represent
+the only significant mode of use of the product.
+
+ "Installation Information" for a User Product means any methods,
+procedures, authorization keys, or other information required to install
+and execute modified versions of a covered work in that User Product from
+a modified version of its Corresponding Source. The information must
+suffice to ensure that the continued functioning of the modified object
+code is in no case prevented or interfered with solely because
+modification has been made.
+
+ If you convey an object code work under this section in, or with, or
+specifically for use in, a User Product, and the conveying occurs as
+part of a transaction in which the right of possession and use of the
+User Product is transferred to the recipient in perpetuity or for a
+fixed term (regardless of how the transaction is characterized), the
+Corresponding Source conveyed under this section must be accompanied
+by the Installation Information. But this requirement does not apply
+if neither you nor any third party retains the ability to install
+modified object code on the User Product (for example, the work has
+been installed in ROM).
+
+ The requirement to provide Installation Information does not include a
+requirement to continue to provide support service, warranty, or updates
+for a work that has been modified or installed by the recipient, or for
+the User Product in which it has been modified or installed. Access to a
+network may be denied when the modification itself materially and
+adversely affects the operation of the network or violates the rules and
+protocols for communication across the network.
+
+ Corresponding Source conveyed, and Installation Information provided,
+in accord with this section must be in a format that is publicly
+documented (and with an implementation available to the public in
+source code form), and must require no special password or key for
+unpacking, reading or copying.
+
+ 7. Additional Terms.
+
+ "Additional permissions" are terms that supplement the terms of this
+License by making exceptions from one or more of its conditions.
+Additional permissions that are applicable to the entire Program shall
+be treated as though they were included in this License, to the extent
+that they are valid under applicable law. If additional permissions
+apply only to part of the Program, that part may be used separately
+under those permissions, but the entire Program remains governed by
+this License without regard to the additional permissions.
+
+ When you convey a copy of a covered work, you may at your option
+remove any additional permissions from that copy, or from any part of
+it. (Additional permissions may be written to require their own
+removal in certain cases when you modify the work.) You may place
+additional permissions on material, added by you to a covered work,
+for which you have or can give appropriate copyright permission.
+
+ Notwithstanding any other provision of this License, for material you
+add to a covered work, you may (if authorized by the copyright holders of
+that material) supplement the terms of this License with terms:
+
+ a) Disclaiming warranty or limiting liability differently from the
+ terms of sections 15 and 16 of this License; or
+
+ b) Requiring preservation of specified reasonable legal notices or
+ author attributions in that material or in the Appropriate Legal
+ Notices displayed by works containing it; or
+
+ c) Prohibiting misrepresentation of the origin of that material, or
+ requiring that modified versions of such material be marked in
+ reasonable ways as different from the original version; or
+
+ d) Limiting the use for publicity purposes of names of licensors or
+ authors of the material; or
+
+ e) Declining to grant rights under trademark law for use of some
+ trade names, trademarks, or service marks; or
+
+ f) Requiring indemnification of licensors and authors of that
+ material by anyone who conveys the material (or modified versions of
+ it) with contractual assumptions of liability to the recipient, for
+ any liability that these contractual assumptions directly impose on
+ those licensors and authors.
+
+ All other non-permissive additional terms are considered "further
+restrictions" within the meaning of section 10. If the Program as you
+received it, or any part of it, contains a notice stating that it is
+governed by this License along with a term that is a further
+restriction, you may remove that term. If a license document contains
+a further restriction but permits relicensing or conveying under this
+License, you may add to a covered work material governed by the terms
+of that license document, provided that the further restriction does
+not survive such relicensing or conveying.
+
+ If you add terms to a covered work in accord with this section, you
+must place, in the relevant source files, a statement of the
+additional terms that apply to those files, or a notice indicating
+where to find the applicable terms.
+
+ Additional terms, permissive or non-permissive, may be stated in the
+form of a separately written license, or stated as exceptions;
+the above requirements apply either way.
+
+ 8. Termination.
+
+ You may not propagate or modify a covered work except as expressly
+provided under this License. Any attempt otherwise to propagate or
+modify it is void, and will automatically terminate your rights under
+this License (including any patent licenses granted under the third
+paragraph of section 11).
+
+ However, if you cease all violation of this License, then your
+license from a particular copyright holder is reinstated (a)
+provisionally, unless and until the copyright holder explicitly and
+finally terminates your license, and (b) permanently, if the copyright
+holder fails to notify you of the violation by some reasonable means
+prior to 60 days after the cessation.
+
+ Moreover, your license from a particular copyright holder is
+reinstated permanently if the copyright holder notifies you of the
+violation by some reasonable means, this is the first time you have
+received notice of violation of this License (for any work) from that
+copyright holder, and you cure the violation prior to 30 days after
+your receipt of the notice.
+
+ Termination of your rights under this section does not terminate the
+licenses of parties who have received copies or rights from you under
+this License. If your rights have been terminated and not permanently
+reinstated, you do not qualify to receive new licenses for the same
+material under section 10.
+
+ 9. Acceptance Not Required for Having Copies.
+
+ You are not required to accept this License in order to receive or
+run a copy of the Program. Ancillary propagation of a covered work
+occurring solely as a consequence of using peer-to-peer transmission
+to receive a copy likewise does not require acceptance. However,
+nothing other than this License grants you permission to propagate or
+modify any covered work. These actions infringe copyright if you do
+not accept this License. Therefore, by modifying or propagating a
+covered work, you indicate your acceptance of this License to do so.
+
+ 10. Automatic Licensing of Downstream Recipients.
+
+ Each time you convey a covered work, the recipient automatically
+receives a license from the original licensors, to run, modify and
+propagate that work, subject to this License. You are not responsible
+for enforcing compliance by third parties with this License.
+
+ An "entity transaction" is a transaction transferring control of an
+organization, or substantially all assets of one, or subdividing an
+organization, or merging organizations. If propagation of a covered
+work results from an entity transaction, each party to that
+transaction who receives a copy of the work also receives whatever
+licenses to the work the party's predecessor in interest had or could
+give under the previous paragraph, plus a right to possession of the
+Corresponding Source of the work from the predecessor in interest, if
+the predecessor has it or can get it with reasonable efforts.
+
+ You may not impose any further restrictions on the exercise of the
+rights granted or affirmed under this License. For example, you may
+not impose a license fee, royalty, or other charge for exercise of
+rights granted under this License, and you may not initiate litigation
+(including a cross-claim or counterclaim in a lawsuit) alleging that
+any patent claim is infringed by making, using, selling, offering for
+sale, or importing the Program or any portion of it.
+
+ 11. Patents.
+
+ A "contributor" is a copyright holder who authorizes use under this
+License of the Program or a work on which the Program is based. The
+work thus licensed is called the contributor's "contributor version".
+
+ A contributor's "essential patent claims" are all patent claims
+owned or controlled by the contributor, whether already acquired or
+hereafter acquired, that would be infringed by some manner, permitted
+by this License, of making, using, or selling its contributor version,
+but do not include claims that would be infringed only as a
+consequence of further modification of the contributor version. For
+purposes of this definition, "control" includes the right to grant
+patent sublicenses in a manner consistent with the requirements of
+this License.
+
+ Each contributor grants you a non-exclusive, worldwide, royalty-free
+patent license under the contributor's essential patent claims, to
+make, use, sell, offer for sale, import and otherwise run, modify and
+propagate the contents of its contributor version.
+
+ In the following three paragraphs, a "patent license" is any express
+agreement or commitment, however denominated, not to enforce a patent
+(such as an express permission to practice a patent or covenant not to
+sue for patent infringement). To "grant" such a patent license to a
+party means to make such an agreement or commitment not to enforce a
+patent against the party.
+
+ If you convey a covered work, knowingly relying on a patent license,
+and the Corresponding Source of the work is not available for anyone
+to copy, free of charge and under the terms of this License, through a
+publicly available network server or other readily accessible means,
+then you must either (1) cause the Corresponding Source to be so
+available, or (2) arrange to deprive yourself of the benefit of the
+patent license for this particular work, or (3) arrange, in a manner
+consistent with the requirements of this License, to extend the patent
+license to downstream recipients. "Knowingly relying" means you have
+actual knowledge that, but for the patent license, your conveying the
+covered work in a country, or your recipient's use of the covered work
+in a country, would infringe one or more identifiable patents in that
+country that you have reason to believe are valid.
+
+ If, pursuant to or in connection with a single transaction or
+arrangement, you convey, or propagate by procuring conveyance of, a
+covered work, and grant a patent license to some of the parties
+receiving the covered work authorizing them to use, propagate, modify
+or convey a specific copy of the covered work, then the patent license
+you grant is automatically extended to all recipients of the covered
+work and works based on it.
+
+ A patent license is "discriminatory" if it does not include within
+the scope of its coverage, prohibits the exercise of, or is
+conditioned on the non-exercise of one or more of the rights that are
+specifically granted under this License. You may not convey a covered
+work if you are a party to an arrangement with a third party that is
+in the business of distributing software, under which you make payment
+to the third party based on the extent of your activity of conveying
+the work, and under which the third party grants, to any of the
+parties who would receive the covered work from you, a discriminatory
+patent license (a) in connection with copies of the covered work
+conveyed by you (or copies made from those copies), or (b) primarily
+for and in connection with specific products or compilations that
+contain the covered work, unless you entered into that arrangement,
+or that patent license was granted, prior to 28 March 2007.
+
+ Nothing in this License shall be construed as excluding or limiting
+any implied license or other defenses to infringement that may
+otherwise be available to you under applicable patent law.
+
+ 12. No Surrender of Others' Freedom.
+
+ If conditions are imposed on you (whether by court order, agreement or
+otherwise) that contradict the conditions of this License, they do not
+excuse you from the conditions of this License. If you cannot convey a
+covered work so as to satisfy simultaneously your obligations under this
+License and any other pertinent obligations, then as a consequence you may
+not convey it at all. For example, if you agree to terms that obligate you
+to collect a royalty for further conveying from those to whom you convey
+the Program, the only way you could satisfy both those terms and this
+License would be to refrain entirely from conveying the Program.
+
+ 13. Use with the GNU Affero General Public License.
+
+ Notwithstanding any other provision of this License, you have
+permission to link or combine any covered work with a work licensed
+under version 3 of the GNU Affero General Public License into a single
+combined work, and to convey the resulting work. The terms of this
+License will continue to apply to the part which is the covered work,
+but the special requirements of the GNU Affero General Public License,
+section 13, concerning interaction through a network will apply to the
+combination as such.
+
+ 14. Revised Versions of this License.
+
+ The Free Software Foundation may publish revised and/or new versions of
+the GNU General Public License from time to time. Such new versions will
+be similar in spirit to the present version, but may differ in detail to
+address new problems or concerns.
+
+ Each version is given a distinguishing version number. If the
+Program specifies that a certain numbered version of the GNU General
+Public License "or any later version" applies to it, you have the
+option of following the terms and conditions either of that numbered
+version or of any later version published by the Free Software
+Foundation. If the Program does not specify a version number of the
+GNU General Public License, you may choose any version ever published
+by the Free Software Foundation.
+
+ If the Program specifies that a proxy can decide which future
+versions of the GNU General Public License can be used, that proxy's
+public statement of acceptance of a version permanently authorizes you
+to choose that version for the Program.
+
+ Later license versions may give you additional or different
+permissions. However, no additional obligations are imposed on any
+author or copyright holder as a result of your choosing to follow a
+later version.
+
+ 15. Disclaimer of Warranty.
+
+ THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY
+APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT
+HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY
+OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM
+IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF
+ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
+
+ 16. Limitation of Liability.
+
+ IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
+WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS
+THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY
+GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE
+USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF
+DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD
+PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS),
+EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF
+SUCH DAMAGES.
+
+ 17. Interpretation of Sections 15 and 16.
+
+ If the disclaimer of warranty and limitation of liability provided
+above cannot be given local legal effect according to their terms,
+reviewing courts shall apply local law that most closely approximates
+an absolute waiver of all civil liability in connection with the
+Program, unless a warranty or assumption of liability accompanies a
+copy of the Program in return for a fee.
+
+ END OF TERMS AND CONDITIONS
+
+ How to Apply These Terms to Your New Programs
+
+ If you develop a new program, and you want it to be of the greatest
+possible use to the public, the best way to achieve this is to make it
+free software which everyone can redistribute and change under these terms.
+
+ To do so, attach the following notices to the program. It is safest
+to attach them to the start of each source file to most effectively
+state the exclusion of warranty; and each file should have at least
+the "copyright" line and a pointer to where the full notice is found.
+
+ <one line to give the program's name and a brief idea of what it does.>
+ Copyright (C) <year> <name of author>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+Also add information on how to contact you by electronic and paper mail.
+
+ If the program does terminal interaction, make it output a short
+notice like this when it starts in an interactive mode:
+
+ <program> Copyright (C) <year> <name of author>
+ This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
+ This is free software, and you are welcome to redistribute it
+ under certain conditions; type `show c' for details.
+
+The hypothetical commands `show w' and `show c' should show the appropriate
+parts of the General Public License. Of course, your program's commands
+might be different; for a GUI interface, you would use an "about box".
+
+ You should also get your employer (if you work as a programmer) or school,
+if any, to sign a "copyright disclaimer" for the program, if necessary.
+For more information on this, and how to apply and follow the GNU GPL, see
+<https://www.gnu.org/licenses/>.
+
+ The GNU General Public License does not permit incorporating your program
+into proprietary programs. If your program is a subroutine library, you
+may consider it more useful to permit linking proprietary applications with
+the library. If this is what you want to do, use the GNU Lesser General
+Public License instead of this License. But first, please read
+<https://www.gnu.org/licenses/why-not-lgpl.html>.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..c5716b5
--- /dev/null
+++ b/README.md
@@ -0,0 +1,159 @@
+# go-orchestrator
+
+[![CircleCI](https://circleci.com/gh/netdata/go-orchestrator.svg?style=svg)](https://circleci.com/gh/netdata/go-orchestrator)
+
+This library is a tool for writing [netdata](https://github.com/netdata/netdata) plugins.
+
+We strongly believe that custom plugins are very important and they must be easy to write.
+
+
+Definitions:
+ - orchestrator
+ > plugin orchestrators are external plugins that do not collect any data by themselves. Instead they support data collection modules written in the language of the orchestrator. Usually the orchestrator provides a higher level abstraction, making it ideal for writing new data collection modules with the minimum of code.
+
+ - plugin
+ > plugin is a set of data collection modules.
+
+ - module
+ > module is a data collector. It collects, processes and returns processed data to the orchestrator.
+
+ - job
+ > job is a module instance with specific settings.
+
+
+Package provides:
+ - CLI parser
+ - plugin orchestrator (loads configurations, creates and serves jobs)
+
+You are responsible only for __creating modules__.
+
+## Custom plugin example
+
+[Yep! So easy!](https://github.com/netdata/go-orchestrator/blob/master/examples/simple/main.go)
+
+## How to write a Module
+
+Module is responsible for **charts creating** and **data collecting**. Implement Module interface and that is it.
+
+```go
+type Module interface {
+ // Init does initialization.
+ // If it returns false, the job will be disabled.
+ Init() bool
+
+ // Check is called after Init.
+ // If it returns false, the job will be disabled.
+ Check() bool
+
+ // Charts returns the chart definition.
+ // Make sure not to share returned instance.
+ Charts() *Charts
+
+ // Collect collects metrics.
+ Collect() map[string]int64
+
+ // SetLogger sets logger.
+ SetLogger(l *logger.Logger)
+
+ // Cleanup performs cleanup if needed.
+ Cleanup()
+}
+
+// Base is a helper struct. All modules should embed this struct.
+type Base struct {
+ *logger.Logger
+}
+
+// SetLogger sets logger.
+func (b *Base) SetLogger(l *logger.Logger) { b.Logger = l }
+
+```
+
+## How to write a Plugin
+
+Since plugin is a set of modules all you need is:
+ - write module(s)
+ - add module(s) to the plugins [registry](https://github.com/netdata/go-orchestrator/blob/master/module/registry.go)
+ - start the plugin
+
+
+## How to integrate your plugin into Netdata
+
+Three simple steps:
+ - move the plugin to the `plugins.d` dir.
+ - add plugin configuration file to the `etc/netdata/` dir.
+ - add modules configuration files to the `etc/netdata/<DIR_NAME>/` dir.
+
+Congratulations!
+
+## Configurations
+
+Configurations are written in [YAML](https://yaml.org/).
+
+ - plugin configuration:
+
+```yaml
+
+# Enable/disable the whole plugin.
+enabled: yes
+
+# Default enable/disable value for all modules.
+default_run: yes
+
+# Maximum number of used CPUs. Zero means no limit.
+max_procs: 0
+
+# Enable/disable specific plugin module
+modules:
+# module_name1: yes
+# module_name2: yes
+
+```
+
+ - module configuration
+
+```yaml
+# [ GLOBAL ]
+update_every: 1
+autodetection_retry: 0
+
+# [ JOBS ]
+jobs:
+ - name: job1
+ param1: value1
+ param2: value2
+
+ - name: job2
+ param1: value1
+ param2: value2
+```
+
+Plugin uses `yaml.Unmarshal` to add configuration parameters to the module. Please use `yaml` tags!
+
+## Debug
+
+Plugin CLI:
+```
+Usage:
+ plugin [OPTIONS] [update every]
+
+Application Options:
+ -d, --debug debug mode
+ -m, --modules= modules name (default: all)
+ -c, --config= config dir
+
+Help Options:
+ -h, --help Show this help message
+
+```
+
+Specific module debug:
+```
+# become user netdata
+sudo su -s /bin/bash netdata
+
+# run plugin in debug mode
+./<plugin_name> -d -m <module_name>
+```
+
+Change `<plugin_name>` to your plugin name and `<module_name>` to the module name you want to debug.
diff --git a/cli/cli.go b/cli/cli.go
new file mode 100644
index 0000000..784de59
--- /dev/null
+++ b/cli/cli.go
@@ -0,0 +1,39 @@
+package cli
+
+import (
+ "strconv"
+
+ "github.com/jessevdk/go-flags"
+)
+
+// Option defines command line options.
+type Option struct {
+ UpdateEvery int
+ Debug bool `short:"d" long:"debug" description:"debug mode"`
+ Module string `short:"m" long:"modules" description:"modules name" default:"all"`
+ ConfigDir []string `short:"c" long:"config" description:"config dir"`
+ Version bool `short:"v" long:"version" description:"display the version and exit"`
+}
+
+// Parse returns parsed command-line flags in Option struct
+func Parse(args []string) (*Option, error) {
+ opt := &Option{
+ UpdateEvery: 1,
+ }
+ parser := flags.NewParser(opt, flags.Default)
+ parser.Name = "orchestrator"
+ parser.Usage = "[OPTIONS] [update every]"
+
+ rest, err := parser.ParseArgs(args)
+ if err != nil {
+ return nil, err
+ }
+
+ if len(rest) > 1 {
+ if opt.UpdateEvery, err = strconv.Atoi(rest[1]); err != nil {
+ return nil, err
+ }
+ }
+
+ return opt, nil
+}
diff --git a/create.go b/create.go
new file mode 100644
index 0000000..c1eeecb
--- /dev/null
+++ b/create.go
@@ -0,0 +1,221 @@
+package orchestrator
+
+import (
+ "fmt"
+ "io"
+ "os"
+ "path"
+ "regexp"
+
+ "github.com/netdata/go-orchestrator/module"
+ "github.com/netdata/go-orchestrator/pkg/multipath"
+
+ "gopkg.in/yaml.v2"
+)
+
+const (
+ defaultUpdateEvery = 1
+ defaultAutoDetectionRetry = 0
+ DefaultJobPriority = 70000
+)
+
+func newModuleConfig() *moduleConfig {
+ return &moduleConfig{
+ UpdateEvery: defaultUpdateEvery,
+ AutoDetectionRetry: defaultAutoDetectionRetry,
+ Priority: DefaultJobPriority,
+ }
+}
+
+type moduleConfig struct {
+ UpdateEvery int `yaml:"update_every"`
+ AutoDetectionRetry int `yaml:"autodetection_retry"`
+ Priority int `yaml:"priority"`
+ Jobs []map[string]interface{} `yaml:"jobs"`
+
+ name string
+}
+
+func (m *moduleConfig) setGlobalDefaults(defaults module.Defaults) {
+ if defaults.UpdateEvery > 0 {
+ m.UpdateEvery = defaults.UpdateEvery
+ }
+ if defaults.AutoDetectionRetry > 0 {
+ m.AutoDetectionRetry = defaults.AutoDetectionRetry
+ }
+ if defaults.Priority > 0 {
+ m.Priority = defaults.Priority
+ }
+}
+
+func (m *moduleConfig) updateJobs(minUpdateEvery int) {
+ for _, job := range m.Jobs {
+ if _, ok := job["update_every"]; !ok {
+ job["update_every"] = m.UpdateEvery
+ }
+
+ if _, ok := job["autodetection_retry"]; !ok {
+ job["autodetection_retry"] = m.AutoDetectionRetry
+ }
+
+ if _, ok := job["priority"]; !ok {
+ job["priority"] = m.Priority
+ }
+
+ if v, ok := job["update_every"].(int); ok && v < minUpdateEvery {
+ job["update_every"] = minUpdateEvery
+ }
+ }
+}
+
+func (o *Orchestrator) loadModuleConfig(name string) *moduleConfig {
+ log.Infof("loading '%s' configuration", name)
+
+ dirName := o.ModulesConfigDirName
+ if dirName == "" {
+ dirName = o.Name
+ }
+
+ modConf := newModuleConfig()
+ if creator, ok := o.Registry[name]; ok {
+ modConf.setGlobalDefaults(creator.Defaults)
+ }
+ modConf.name = name
+
+ configPath, err := o.ConfigPath.Find(fmt.Sprintf("%s/%s.conf", dirName, name))
+
+ if err != nil {
+ if !multipath.IsNotFound(err) {
+ log.Errorf("skipping '%s': %v", name, err)
+ return nil
+ }
+
+ log.Warningf("'%s': %v, will use default 1 job configuration", name, err)
+ modConf.Jobs = []map[string]interface{}{{}}
+ return modConf
+ }
+
+ if err = loadYAML(modConf, configPath); err != nil {
+ log.Errorf("skipping '%s': %v", name, err)
+ return nil
+ }
+
+ if len(modConf.Jobs) == 0 {
+ log.Errorf("skipping '%s': config 'jobs' section is empty or not exist", name)
+ return nil
+ }
+
+ return modConf
+}
+
+var space = regexp.MustCompile(`\s+`)
+
+func cleanJobName(name string) string {
+ return space.ReplaceAllString(name, "_")
+}
+
+func (o *Orchestrator) createModuleJobs(modConf *moduleConfig, js *jobsStatuses) []Job {
+ var jobs []Job
+
+ creator := o.Registry[modConf.name]
+ modConf.updateJobs(o.Option.UpdateEvery)
+
+ jobName := func(conf map[string]interface{}) interface{} {
+ if name, ok := conf["name"]; ok {
+ return name
+ }
+ return "unnamed"
+ }
+
+ for _, conf := range modConf.Jobs {
+ mod := creator.Create()
+
+ if err := unmarshal(conf, mod); err != nil {
+ log.Errorf("skipping %s[%s]: %s", modConf.name, jobName(conf), err)
+ continue
+ }
+
+ job := module.NewJob(o.Name, modConf.name, mod, o.Out)
+
+ if err := unmarshal(conf, job); err != nil {
+ log.Errorf("skipping %s[%s]: %s", modConf.name, jobName(conf), err)
+ continue
+ }
+
+ job.Nam = cleanJobName(job.Nam)
+
+ if js != nil && js.contains(Job(job)) && job.AutoDetectEvery == 0 {
+ log.Infof("%s[%s] was active on previous run, applying recovering settings", job.ModuleName(), job.Name())
+ job.AutoDetectTries = 11
+ job.AutoDetectEvery = 30
+ }
+
+ jobs = append(jobs, job)
+ }
+
+ return jobs
+}
+
+func (o *Orchestrator) createJobs() []Job {
+ var jobs []Job
+
+ js, err := o.loadJobsStatuses()
+ if err != nil {
+ log.Warning(err)
+ }
+
+ for name := range o.modules {
+ conf := o.loadModuleConfig(name)
+ if conf == nil {
+ continue
+ }
+
+ for _, job := range o.createModuleJobs(conf, js) {
+ jobs = append(jobs, job)
+ }
+ }
+
+ return jobs
+}
+
+func (o *Orchestrator) loadJobsStatuses() (*jobsStatuses, error) {
+ if o.varLibDir == "" {
+ return nil, nil
+ }
+
+ name := path.Join(o.varLibDir, jobStatusesFile)
+ v, err := loadJobsStatusesFromFile(name)
+ if err != nil {
+ return nil, fmt.Errorf("error on loading '%s' : %v", name, err)
+ }
+ return v, nil
+}
+
+func unmarshal(conf interface{}, module interface{}) error {
+ b, err := yaml.Marshal(conf)
+ if err != nil {
+ return err
+ }
+ return yaml.Unmarshal(b, module)
+}
+
+func loadYAML(conf interface{}, filename string) error {
+ file, err := os.Open(filename)
+ defer file.Close()
+
+ if err != nil {
+ log.Debug("open file ", filename, ": ", err)
+ return err
+ }
+
+ if err = yaml.NewDecoder(file).Decode(conf); err != nil {
+ if err == io.EOF {
+ log.Debug("config file is empty")
+ return nil
+ }
+ log.Debug("read YAML ", filename, ": ", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/create_test.go b/create_test.go
new file mode 100644
index 0000000..277eea5
--- /dev/null
+++ b/create_test.go
@@ -0,0 +1,236 @@
+package orchestrator
+
+import (
+ "fmt"
+ "path"
+ "testing"
+
+ "github.com/netdata/go-orchestrator/cli"
+ "github.com/netdata/go-orchestrator/module"
+ "github.com/netdata/go-orchestrator/pkg/multipath"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func Test_loadModuleConfigNoConfig(t *testing.T) {
+ o := New()
+ o.Name = "test.d"
+ o.ConfigPath = multipath.New("./testdata")
+ assert.NotNil(t, o.loadModuleConfig("no_config"))
+}
+
+func Test_loadModuleConfigBrokenConfig(t *testing.T) {
+ o := New()
+ o.Name = "test.d"
+ o.ConfigPath = multipath.New("./testdata")
+ assert.Nil(t, o.loadModuleConfig("module-broken"))
+}
+
+func Test_loadModuleConfigNoJobs(t *testing.T) {
+ o := New()
+ o.Name = "test.d"
+ o.ConfigPath = multipath.New("./testdata")
+ assert.Nil(t, o.loadModuleConfig("module-no-jobs"))
+}
+
+func Test_loadModuleConfig(t *testing.T) {
+ o := New()
+ o.Name = "test.d"
+ o.ConfigPath = multipath.New("./testdata")
+ o.ModulesConfigDirName = "test.d"
+ conf := o.loadModuleConfig("module1")
+ require.NotNil(t, conf)
+ assert.Equal(t, 3, len(conf.Jobs))
+}
+
+func Test_loadModuleConfigNotFound(t *testing.T) {
+ o := New()
+ o.Name = "test.d"
+ o.ConfigPath = multipath.New("./testdata")
+ o.ModulesConfigDirName = "test_not_exist.d"
+ conf := o.loadModuleConfig("module1")
+ require.NotNil(t, conf)
+ assert.Equal(t, 1, len(conf.Jobs))
+}
+
+func Test_createModuleJobs(t *testing.T) {
+ o := New()
+ o.Name = "test.d"
+ o.ConfigPath = multipath.New("./testdata")
+ o.Option = &cli.Option{}
+ reg := make(module.Registry)
+ reg.Register(
+ "module1",
+ module.Creator{Create: func() module.Module { return &module.MockModule{} }},
+ )
+
+ o.Registry = reg
+ conf := newModuleConfig()
+ conf.Jobs = []map[string]interface{}{{}, {}, {}}
+ conf.name = "module1"
+ assert.Len(t, o.createModuleJobs(conf, nil), 3)
+}
+
+func Test_createModuleJobsWithJobsStatuses(t *testing.T) {
+ o := New()
+ o.Name = "test.d"
+ o.ConfigPath = multipath.New("./testdata")
+ o.Option = &cli.Option{}
+ reg := make(module.Registry)
+ reg.Register(
+ "module1",
+ module.Creator{Create: func() module.Module { return &module.MockModule{} }},
+ )
+
+ o.Registry = reg
+ conf := newModuleConfig()
+ conf.Jobs = []map[string]interface{}{{}, {}, {}}
+ conf.name = "module1"
+
+ js, err := loadJobsStatusesFromFile(path.Join("./testdata", "god-jobs-statuses.json"))
+ require.NoError(t, err)
+ fmt.Println(js.items)
+
+ jobs := o.createModuleJobs(conf, js)
+ assert.Len(t, jobs, 3)
+ for _, job := range jobs {
+ j := job.(*module.Job)
+ assert.Equal(t, 11, j.AutoDetectTries)
+ assert.Equal(t, 30, j.AutoDetectEvery)
+ }
+}
+
+func TestPluginConfig_isModuleEnabled(t *testing.T) {
+ modName1 := "modName1"
+ modName2 := "modName2"
+ modName3 := "modName3"
+
+ conf := Config{
+ DefaultRun: true,
+ Modules: map[string]bool{
+ modName1: true,
+ modName2: false,
+ },
+ }
+
+ assert.True(t, conf.isModuleEnabled(modName1, false))
+ assert.False(t, conf.isModuleEnabled(modName2, false))
+ assert.Equal(
+ t,
+ conf.DefaultRun,
+ conf.isModuleEnabled(modName3, false),
+ )
+
+ assert.True(t, conf.isModuleEnabled(modName1, true))
+ assert.False(t, conf.isModuleEnabled(modName2, true))
+ assert.Equal(
+ t,
+ !conf.DefaultRun,
+ conf.isModuleEnabled(modName3, true),
+ )
+
+ conf.DefaultRun = false
+
+ assert.True(t, conf.isModuleEnabled(modName1, false))
+ assert.False(t, conf.isModuleEnabled(modName2, false))
+ assert.Equal(
+ t,
+ conf.DefaultRun,
+ conf.isModuleEnabled(modName3, false),
+ )
+
+ assert.True(t, conf.isModuleEnabled(modName1, true))
+ assert.False(t, conf.isModuleEnabled(modName2, true))
+ assert.Equal(
+ t,
+ conf.DefaultRun,
+ conf.isModuleEnabled(modName3, true),
+ )
+
+}
+
+func TestModuleConfig_updateJobs(t *testing.T) {
+ conf := newModuleConfig()
+ conf.Jobs = []map[string]interface{}{
+ {"name": "job1"},
+ {"name": "job2", "update_every": 10},
+ }
+ conf.updateJobs(0)
+
+ assert.Equal(
+ t,
+ []map[string]interface{}{
+ {
+ "name": "job1",
+ "update_every": defaultUpdateEvery,
+ "autodetection_retry": defaultAutoDetectionRetry,
+ "priority": DefaultJobPriority,
+ },
+ {
+ "name": "job2",
+ "update_every": 10,
+ "autodetection_retry": defaultAutoDetectionRetry,
+ "priority": DefaultJobPriority,
+ },
+ },
+ conf.Jobs,
+ )
+}
+
+func TestModuleConfig_UpdateJobsRewriteModuleUpdateEvery(t *testing.T) {
+ conf := newModuleConfig()
+ conf.setGlobalDefaults(module.Defaults{UpdateEvery: 20})
+ conf.Jobs = []map[string]interface{}{
+ {"name": "job1"},
+ {"name": "job2", "update_every": 10},
+ }
+ conf.updateJobs(0)
+
+ assert.Equal(
+ t,
+ []map[string]interface{}{
+ {
+ "name": "job1",
+ "update_every": 20,
+ "autodetection_retry": defaultAutoDetectionRetry,
+ "priority": DefaultJobPriority,
+ },
+ {
+ "name": "job2",
+ "update_every": 10,
+ "autodetection_retry": defaultAutoDetectionRetry,
+ "priority": DefaultJobPriority,
+ },
+ },
+ conf.Jobs,
+ )
+}
+
+func TestModuleConfig_UpdateJobsRewritePluginUpdateEvery(t *testing.T) {
+ conf := newModuleConfig()
+ conf.Jobs = []map[string]interface{}{
+ {"name": "job1"},
+ {"name": "job2", "update_every": 10},
+ }
+ conf.updateJobs(5)
+
+ assert.Equal(
+ t,
+ []map[string]interface{}{
+ {
+ "name": "job1",
+ "update_every": 5,
+ "autodetection_retry": defaultAutoDetectionRetry,
+ "priority": DefaultJobPriority,
+ },
+ {
+ "name": "job2",
+ "update_every": 10,
+ "autodetection_retry": defaultAutoDetectionRetry,
+ "priority": DefaultJobPriority,
+ },
+ },
+ conf.Jobs,
+ )
+}
diff --git a/examples/config/module.conf b/examples/config/module.conf
new file mode 100644
index 0000000..2e94365
--- /dev/null
+++ b/examples/config/module.conf
@@ -0,0 +1,66 @@
+# netdata plugin configuration for example
+#
+# This file is in YaML format. Generally the format is:
+#
+# name: value
+#
+# There are 2 sections:
+# - GLOBAL
+# - JOBS
+#
+#
+# [ GLOBAL ]
+# These variables set the defaults for all JOBs, however each JOB may define its own, overriding the defaults.
+#
+# The GLOBAL section format:
+# param1: value1
+# param2: value2
+#
+# Currently supported global parameters:
+# - update_every
+# Data collection frequency in seconds. Default: 1.
+#
+# - autodetection_retry
+# Re-check interval in seconds. Attempts to start the job are made once every interval.
+# Zero means not to schedule re-check. Default: 0.
+#
+#
+# [ JOBS ]
+# JOBS allow you to collect values from multiple sources.
+# Each source will have its own set of charts.
+#
+# IMPORTANT:
+# - Parameter 'name' is mandatory.
+# - Jobs with the same name are mutually exclusive. Only one of them will be allowed running at any time.
+#
+# This allows autodetection to try several alternatives and pick the one that works.
+# Any number of jobs is supported.
+#
+# The JOBS section format:
+#
+# jobs:
+# - name: job1
+# param1: value1
+# param2: value2
+#
+# - name: job2
+# param1: value1
+# param2: value2
+#
+# - name: job2
+# param1: value1
+#
+#
+# ------------------------------------------------MODULE-CONFIGURATION--------------------------------------------------
+# [ GLOBAL ]
+update_every: 1
+autodetection_retry: 0
+
+# [ JOBS ]
+ - name: job1
+ param1: value1
+ param2: value2
+
+ - name: job2
+ param1: value1
+ param2: value2 \ No newline at end of file
diff --git a/examples/config/plugin.conf b/examples/config/plugin.conf
new file mode 100644
index 0000000..479f681
--- /dev/null
+++ b/examples/config/plugin.conf
@@ -0,0 +1,15 @@
+# This file is in YaML format.
+
+# Enable/disable the whole plugin.
+enabled: yes
+
+# Default enable/disable value for all modules.
+default_run: yes
+
+# Maximum number of used CPUs. Zero means no limit.
+max_procs: 0
+
+# Enable/disable specific g.d.plugin module
+modules:
+# module_name1: yes
+# module_name2: yes
diff --git a/examples/simple/main.go b/examples/simple/main.go
new file mode 100644
index 0000000..0a1b883
--- /dev/null
+++ b/examples/simple/main.go
@@ -0,0 +1,85 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "math/rand"
+ "os"
+
+ "github.com/netdata/go-orchestrator"
+ "github.com/netdata/go-orchestrator/cli"
+ "github.com/netdata/go-orchestrator/logger"
+ "github.com/netdata/go-orchestrator/module"
+)
+
+var version = "v0.0.1-example"
+
+type example struct{ module.Base }
+
+func (example) Cleanup() {}
+
+func (example) Init() bool { return true }
+
+func (example) Check() bool { return true }
+
+func (example) Charts() *module.Charts {
+ return &module.Charts{
+ {
+ ID: "random",
+ Title: "A Random Number", Units: "random", Fam: "random",
+ Dims: module.Dims{
+ {ID: "random0", Name: "random 0"},
+ {ID: "random1", Name: "random 1"},
+ },
+ },
+ }
+}
+
+func (e *example) Collect() map[string]int64 {
+ return map[string]int64{
+ "random0": rand.Int63n(100),
+ "random1": rand.Int63n(100),
+ }
+}
+
+func main() {
+ opt := parseCLI()
+
+ if opt.Debug {
+ logger.SetSeverity(logger.DEBUG)
+ }
+ if opt.Version {
+ fmt.Println(version)
+ os.Exit(0)
+ }
+
+ module.Register("example", module.Creator{Create: func() module.Module { return &example{} }})
+
+ p := newPlugin(opt)
+
+ if !p.Setup() {
+ return
+ }
+
+ p.Serve()
+}
+
+func newPlugin(opt *cli.Option) *orchestrator.Orchestrator {
+ p := orchestrator.New()
+ p.Name = "test.d"
+ p.Option = opt
+
+ return p
+}
+
+func parseCLI() *cli.Option {
+ opt, err := cli.Parse(os.Args)
+ if err != nil {
+ if err != flag.ErrHelp {
+ os.Exit(1)
+ }
+ os.Exit(0)
+ }
+
+ return opt
+}
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..594a0df
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,11 @@
+module github.com/netdata/go-orchestrator
+
+go 1.13
+
+require (
+ github.com/jessevdk/go-flags v1.4.0
+ github.com/mattn/go-isatty v0.0.7
+ github.com/mitchellh/go-homedir v1.1.0
+ github.com/stretchr/testify v1.3.0
+ gopkg.in/yaml.v2 v2.2.2
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..27cb670
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,19 @@
+github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
+github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
+github.com/mattn/go-isatty v0.0.7 h1:UvyT9uN+3r7yLEYSlJsbQGdsaB/a0DlgWP3pql6iwOc=
+github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
+github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
+github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8=
+golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/logger/countwatcher.go b/logger/countwatcher.go
new file mode 100644
index 0000000..ccab2f0
--- /dev/null
+++ b/logger/countwatcher.go
@@ -0,0 +1,79 @@
+package logger
+
+import (
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+var (
+ resetEvery = time.Second
+)
+
+// GlobalMsgCountWatcher is a initiated instance of MsgCountWatcher.
+// It resets message counter for every registered logger every 1 seconds.
+var GlobalMsgCountWatcher = newMsgCountWatcher(resetEvery)
+
+func newMsgCountWatcher(resetEvery time.Duration) *MsgCountWatcher {
+ t := &MsgCountWatcher{
+ ticker: time.NewTicker(resetEvery),
+ shutdown: make(chan struct{}),
+ items: make(map[int64]*Logger),
+ }
+ go t.start()
+
+ return t
+}
+
+// MsgCountWatcher MsgCountWatcher
+type MsgCountWatcher struct {
+ shutdown chan struct{}
+ ticker *time.Ticker
+
+ mux sync.Mutex
+ items map[int64]*Logger
+}
+
+// Register adds logger to the collection.
+func (m *MsgCountWatcher) Register(logger *Logger) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ m.items[logger.id] = logger
+}
+
+// Unregister removes logger from the collection.
+func (m *MsgCountWatcher) Unregister(logger *Logger) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ if _, ok := m.items[logger.id]; ok {
+ delete(m.items, logger.id)
+ }
+}
+
+func (m *MsgCountWatcher) start() {
+LOOP:
+ for {
+ select {
+ case <-m.shutdown:
+ break LOOP
+ case <-m.ticker.C:
+ m.resetCount()
+ }
+ }
+}
+
+func (m *MsgCountWatcher) stop() {
+ m.shutdown <- struct{}{}
+ m.ticker.Stop()
+}
+
+func (m *MsgCountWatcher) resetCount() {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ for _, v := range m.items {
+ atomic.StoreInt64(&v.msgCount, 0)
+ }
+}
diff --git a/logger/countwatcher_test.go b/logger/countwatcher_test.go
new file mode 100644
index 0000000..e0914e3
--- /dev/null
+++ b/logger/countwatcher_test.go
@@ -0,0 +1,57 @@
+package logger
+
+import (
+ "io/ioutil"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestMsgCountWatcher_Register(t *testing.T) {
+ cw := newMsgCountWatcher(time.Second)
+ defer cw.stop()
+
+ require.Len(t, cw.items, 0)
+
+ logger := New("", "", "")
+ cw.Register(logger)
+
+ require.Len(t, cw.items, 1)
+ require.Equal(t, logger, cw.items[logger.id])
+
+}
+
+func TestMsgCountWatcher_Unregister(t *testing.T) {
+ cw := newMsgCountWatcher(time.Second)
+ defer cw.stop()
+
+ require.Len(t, cw.items, 0)
+
+ logger := New("", "", "")
+ cw.items[logger.id] = logger
+ cw.Unregister(logger)
+
+ require.Len(t, cw.items, 0)
+}
+
+func TestMsgCountWatcher(t *testing.T) {
+ reset := time.Millisecond * 500
+ cw := newMsgCountWatcher(reset)
+ defer cw.stop()
+
+ logger := New("", "", "")
+ logger.limited = true
+ logger.formatter.SetOutput(ioutil.Discard)
+ cw.Register(logger)
+
+ for i := 0; i < 3; i++ {
+ for m := 0; m < 100; m++ {
+ logger.Info()
+ }
+ time.Sleep(reset * 2)
+ assert.Equal(t, int64(0), atomic.LoadInt64(&logger.msgCount))
+ }
+}
diff --git a/logger/formatter.go b/logger/formatter.go
new file mode 100644
index 0000000..f2df4c3
--- /dev/null
+++ b/logger/formatter.go
@@ -0,0 +1,198 @@
+package logger
+
+import (
+ "io"
+ "log"
+ "runtime"
+ "sync"
+ "time"
+)
+
+type (
+ formatter struct {
+ colored bool
+ prefix string
+ out io.Writer // destination for output
+ flag int // properties
+
+ mu sync.Mutex // ensures atomic writes; protects the following fields
+ buf []byte // for accumulating text to write
+ }
+)
+
+func newFormatter(out io.Writer, isCLI bool, prefix string) *formatter {
+ if isCLI {
+ return &formatter{
+ out: out,
+ colored: true,
+ flag: log.Lshortfile,
+ buf: make([]byte, 0, 120),
+ }
+ }
+ return &formatter{
+ out: out,
+ colored: false,
+ prefix: prefix + " ",
+ flag: log.Ldate | log.Ltime,
+ buf: make([]byte, 0, 120),
+ }
+}
+
+func (l *formatter) SetOutput(out io.Writer) {
+ l.out = out
+}
+
+func (l *formatter) Output(severity Severity, module, job string, callDepth int, s string) {
+ now := time.Now() // get this early.
+ var file string
+ var line int
+ if l.flag&(log.Lshortfile|log.Llongfile) != 0 {
+ var ok bool
+ _, file, line, ok = runtime.Caller(callDepth)
+ if !ok {
+ file = "???"
+ line = 0
+ }
+ }
+
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
+ l.formatTimestamp(now)
+ l.buf = append(l.buf, l.prefix...)
+ l.formatSeverity(severity)
+ l.formatModuleJob(module, job)
+ l.formatFile(file, line)
+ l.buf = append(l.buf, s...)
+ if s == "" || s[len(s)-1] != '\n' {
+ l.buf = append(l.buf, '\n')
+ }
+ _, _ = l.out.Write(l.buf)
+ l.buf = l.buf[:0]
+}
+
+// formatModuleJob write module name and job name to buf
+// format: $module[$job]
+func (l *formatter) formatModuleJob(module string, job string) {
+ l.buf = append(l.buf, module...)
+ l.buf = append(l.buf, '[')
+ l.buf = append(l.buf, job...)
+ l.buf = append(l.buf, "] "...)
+}
+
+// formatTimestamp writes timestamp to buf
+// format: YYYY-MM-DD hh:mm:ss:
+func (l *formatter) formatTimestamp(t time.Time) {
+ if l.flag&(log.Ldate|log.Ltime|log.Lmicroseconds) != 0 {
+ if l.flag&log.LUTC != 0 {
+ t = t.UTC()
+ }
+ if l.flag&log.Ldate != 0 {
+ year, month, day := t.Date()
+ itoa(&l.buf, year, 4)
+ l.buf = append(l.buf, '-')
+ itoa(&l.buf, int(month), 2)
+ l.buf = append(l.buf, '-')
+ itoa(&l.buf, day, 2)
+ l.buf = append(l.buf, ' ')
+ }
+ if l.flag&(log.Ltime|log.Lmicroseconds) != 0 {
+ hour, min, sec := t.Clock()
+ itoa(&l.buf, hour, 2)
+ l.buf = append(l.buf, ':')
+ itoa(&l.buf, min, 2)
+ l.buf = append(l.buf, ':')
+ itoa(&l.buf, sec, 2)
+ if l.flag&log.Lmicroseconds != 0 {
+ l.buf = append(l.buf, '.')
+ itoa(&l.buf, t.Nanosecond()/1e3, 6)
+ }
+ l.buf = append(l.buf, ' ')
+ }
+ l.buf[len(l.buf)-1] = ':'
+ l.buf = append(l.buf, ' ')
+ }
+}
+
+// formatSeverity write severity to buf
+// format (CLI): [ $severity ]
+// format (file): $severity:
+func (l *formatter) formatSeverity(severity Severity) {
+ if l.colored {
+ switch severity {
+ case DEBUG:
+ l.buf = append(l.buf, "\x1b[0;36m[ "...) // Cyan text
+ case INFO:
+ l.buf = append(l.buf, "\x1b[0;32m[ "...) // Green text
+ case WARNING:
+ l.buf = append(l.buf, "\x1b[0;33m[ "...) // Yellow text
+ case ERROR:
+ l.buf = append(l.buf, "\x1b[0;31m[ "...) // Red text
+ case CRITICAL:
+ l.buf = append(l.buf, "\x1b[0;37;41m[ "...) // White text with Red background
+ }
+ putString(&l.buf, severity.ShortString(), 5)
+ l.buf = append(l.buf, " ]\x1b[0m "...) // clear color scheme
+ } else {
+ l.buf = append(l.buf, severity.String()...)
+ l.buf = append(l.buf, ": "...)
+ }
+}
+
+// formatFile writes file info to buf
+// format: $file:$line
+func (l *formatter) formatFile(file string, line int) {
+ if l.flag&(log.Lshortfile|log.Llongfile) == 0 {
+ return
+ }
+ if l.flag&log.Lshortfile != 0 {
+ short := file
+ for i := len(file) - 1; i > 0; i-- {
+ if file[i] == '/' {
+ short = file[i+1:]
+ break
+ }
+ }
+ file = short
+ }
+
+ if l.colored {
+ l.buf = append(l.buf, "\x1b[0;90m"...)
+ }
+ l.buf = append(l.buf, file...)
+ l.buf = append(l.buf, ':')
+ itoa(&l.buf, line, -1)
+ if l.colored {
+ l.buf = append(l.buf, "\x1b[0m "...)
+ } else {
+ l.buf = append(l.buf, ' ')
+ }
+}
+
+// itoa Cheap integer to fixed-width decimal ASCII. Give a negative width to avoid zero-padding.
+func itoa(buf *[]byte, i int, wid int) {
+ // Assemble decimal in reverse order.
+ var b [20]byte
+ bp := len(b) - 1
+ for i >= 10 || wid > 1 {
+ wid--
+ q := i / 10
+ b[bp] = byte('0' + i - q*10)
+ bp--
+ i = q
+ }
+ // i < 10
+ b[bp] = byte('0' + i)
+ *buf = append(*buf, b[bp:]...)
+}
+
+// putString Cheap sprintf("%*s", s, wid)
+func putString(buf *[]byte, s string, wid int) {
+ *buf = append(*buf, s...)
+ space := wid - len(s)
+ if space > 0 {
+ for i := 0; i < space; i++ {
+ *buf = append(*buf, ' ')
+ }
+ }
+}
diff --git a/logger/formatter_test.go b/logger/formatter_test.go
new file mode 100644
index 0000000..0d27384
--- /dev/null
+++ b/logger/formatter_test.go
@@ -0,0 +1,32 @@
+package logger
+
+import (
+ "bytes"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestFormatter_Output_cli(t *testing.T) {
+ out := &bytes.Buffer{}
+ fmtter := newFormatter(out, true, "test")
+
+ fmtter.Output(INFO, "mod1", "job1", 1, "hello")
+ assert.NotRegexp(t, `\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}: `, out.String())
+ assert.Contains(t, out.String(), "INFO")
+ assert.Contains(t, out.String(), "mod1[job1]")
+ assert.Contains(t, out.String(), "formatter_test.go:")
+ assert.Contains(t, out.String(), "hello")
+}
+
+func TestFormatter_Output_file(t *testing.T) {
+ out := &bytes.Buffer{}
+ fmtter := newFormatter(out, false, "test")
+
+ fmtter.Output(INFO, "mod1", "job1", 1, "hello")
+ assert.Regexp(t, `\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}: `, out.String())
+ assert.Contains(t, out.String(), "INFO")
+ assert.Contains(t, out.String(), "mod1[job1]")
+ assert.NotContains(t, out.String(), "formatter_test.go:")
+ assert.Contains(t, out.String(), "hello")
+}
diff --git a/logger/logger.go b/logger/logger.go
new file mode 100644
index 0000000..fe94d28
--- /dev/null
+++ b/logger/logger.go
@@ -0,0 +1,200 @@
+package logger
+
+import (
+ "fmt"
+ "os"
+ "sync/atomic"
+
+ "github.com/mattn/go-isatty"
+)
+
+const (
+ msgPerSecondLimit = 60
+)
+
+var (
+ base = New("plugin", "base", "base")
+ initialID = int64(1)
+ isCLI = func() bool {
+ switch os.Getenv("NETDATA_FORCE_COLOR") {
+ case "1", "true":
+ return true
+ case "0", "false":
+ return true
+ default:
+ return isatty.IsTerminal(os.Stderr.Fd())
+ }
+ }()
+)
+
+// Logger represents a logger object
+type Logger struct {
+ formatter *formatter
+
+ id int64
+ modName string
+ jobName string
+
+ limited bool
+ msgCount int64
+}
+
+// New creates a new logger
+func New(pluginName, modName, jobName string) *Logger {
+ return &Logger{
+ formatter: newFormatter(os.Stderr, isCLI, pluginName),
+ modName: modName,
+ jobName: jobName,
+ id: createUniqueID(),
+ }
+}
+
+// NewLimited creates a new limited logger
+func NewLimited(pluginName, modName, jobName string) *Logger {
+ logger := New(pluginName, modName, jobName)
+ logger.limited = true
+ GlobalMsgCountWatcher.Register(logger)
+
+ return logger
+}
+
+// Panic logs a message with the Critical severity then panic
+func (l *Logger) Panic(a ...interface{}) {
+ s := fmt.Sprint(a...)
+ l.output(CRITICAL, 1, s)
+ panic(s)
+}
+
+// Critical logs a message with the Critical severity
+func (l *Logger) Critical(a ...interface{}) {
+ l.output(CRITICAL, 1, fmt.Sprint(a...))
+}
+
+// Error logs a message with the Error severity
+func (l *Logger) Error(a ...interface{}) {
+ l.output(ERROR, 1, fmt.Sprint(a...))
+}
+
+// Warning logs a message with the Warning severity
+func (l *Logger) Warning(a ...interface{}) {
+ l.output(WARNING, 1, fmt.Sprint(a...))
+}
+
+// Info logs a message with the Info severity
+func (l *Logger) Info(a ...interface{}) {
+ l.output(INFO, 1, fmt.Sprint(a...))
+}
+
+// Print logs a message with the Info severity (same as Info)
+func (l *Logger) Print(a ...interface{}) {
+ l.output(INFO, 1, fmt.Sprint(a...))
+}
+
+// Debug logs a message with the Debug severity
+func (l *Logger) Debug(a ...interface{}) {
+ l.output(DEBUG, 1, fmt.Sprint(a...))
+}
+
+// Panicln logs a message with the Critical severity then panic
+func (l *Logger) Panicln(a ...interface{}) {
+ s := fmt.Sprintln(a...)
+ l.output(CRITICAL, 1, s)
+ panic(s)
+}
+
+// Criticalln logs a message with the Critical severity
+func (l *Logger) Criticalln(a ...interface{}) {
+ l.output(CRITICAL, 1, fmt.Sprintln(a...))
+}
+
+// Errorln logs a message with the Error severity
+func (l *Logger) Errorln(a ...interface{}) {
+ l.output(ERROR, 1, fmt.Sprintln(a...))
+}
+
+// Warningln logs a message with the Warning severity
+func (l *Logger) Warningln(a ...interface{}) {
+ l.output(WARNING, 1, fmt.Sprintln(a...))
+}
+
+// Infoln logs a message with the Info severity
+func (l *Logger) Infoln(a ...interface{}) {
+ l.output(INFO, 1, fmt.Sprintln(a...))
+}
+
+// Println logs a message with the Info severity (same as Infoln)
+func (l *Logger) Println(a ...interface{}) {
+ l.output(INFO, 1, fmt.Sprintln(a...))
+}
+
+// Debugln logs a message with the Debug severity
+func (l *Logger) Debugln(a ...interface{}) {
+ l.output(DEBUG, 1, fmt.Sprintln(a...))
+}
+
+// Panicf logs a message with the Critical severity using the same syntax and options as fmt.Printf then panic
+func (l *Logger) Panicf(format string, a ...interface{}) {
+ s := fmt.Sprintf(format, a...)
+ l.output(CRITICAL, 1, s)
+ panic(s)
+}
+
+// Criticalf logs a message with the Critical severity using the same syntax and options as fmt.Printf
+func (l *Logger) Criticalf(format string, a ...interface{}) {
+ l.output(CRITICAL, 1, fmt.Sprintf(format, a...))
+}
+
+// Errorf logs a message with the Error severity using the same syntax and options as fmt.Printf
+func (l *Logger) Errorf(format string, a ...interface{}) {
+ l.output(ERROR, 1, fmt.Sprintf(format, a...))
+}
+
+// Warningf logs a message with the Warning severity using the same syntax and options as fmt.Printf
+func (l *Logger) Warningf(format string, a ...interface{}) {
+ l.output(WARNING, 1, fmt.Sprintf(format, a...))
+}
+
+// Infof logs a message with the Info severity using the same syntax and options as fmt.Printf
+func (l *Logger) Infof(format string, a ...interface{}) {
+ l.output(INFO, 1, fmt.Sprintf(format, a...))
+}
+
+// Printf logs a message with the Info severity using the same syntax and options as fmt.Printf
+func (l *Logger) Printf(format string, a ...interface{}) {
+ l.output(INFO, 1, fmt.Sprintf(format, a...))
+}
+
+// Debugf logs a message with the Debug severity using the same syntax and options as fmt.Printf
+func (l *Logger) Debugf(format string, a ...interface{}) {
+ l.output(DEBUG, 1, fmt.Sprintf(format, a...))
+}
+
+func (l *Logger) output(severity Severity, callDepth int, msg string) {
+ if severity > globalSeverity {
+ return
+ }
+
+ if l == nil || l.formatter == nil {
+ base.formatter.Output(severity, base.modName, base.jobName, callDepth+2, msg)
+ return
+ }
+
+ if l.limited && globalSeverity < DEBUG && atomic.AddInt64(&l.msgCount, 1) > msgPerSecondLimit {
+ return
+ }
+ l.formatter.Output(severity, l.modName, l.jobName, callDepth+2, msg)
+}
+
+// SetSeverity sets global severity level
+func SetSeverity(severity Severity) {
+ globalSeverity = severity
+}
+
+func createUniqueID() int64 {
+ return atomic.AddInt64(&initialID, 1)
+}
+
+// SetPluginName sets logger plugin name.
+func SetPluginName(name string, log *Logger) {
+ log.formatter.prefix = name + " "
+}
diff --git a/logger/logger_test.go b/logger/logger_test.go
new file mode 100644
index 0000000..b78cd86
--- /dev/null
+++ b/logger/logger_test.go
@@ -0,0 +1,212 @@
+package logger
+
+import (
+ "bytes"
+ "io/ioutil"
+ "log"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestSetSeverity(t *testing.T) {
+ require.Equal(t, globalSeverity, INFO)
+ SetSeverity(DEBUG)
+
+ assert.Equal(t, globalSeverity, DEBUG)
+}
+
+func TestNew(t *testing.T) {
+ assert.IsType(
+ t,
+ (*Logger)(nil),
+ New("", "", ""),
+ )
+}
+
+func TestNewLimited(t *testing.T) {
+ logger := NewLimited("", "", "")
+ assert.True(t, logger.limited)
+
+ _, ok := GlobalMsgCountWatcher.items[logger.id]
+ require.True(t, ok)
+ GlobalMsgCountWatcher.Unregister(logger)
+}
+
+func TestLogger_Critical(t *testing.T) {
+ buf := bytes.Buffer{}
+ logger := New("", "", "")
+ logger.formatter.SetOutput(&buf)
+ logger.formatter.flag = log.Lshortfile
+ logger.Critical()
+ assert.Contains(t, buf.String(), CRITICAL.ShortString())
+ assert.Contains(t, buf.String(), " logger_test.go")
+}
+
+func TestLogger_Criticalf(t *testing.T) {
+ buf := bytes.Buffer{}
+ logger := New("", "", "")
+ logger.formatter.SetOutput(&buf)
+ logger.formatter.flag = log.Lshortfile
+ logger.Criticalf("")
+ assert.Contains(t, buf.String(), CRITICAL.ShortString())
+ assert.Contains(t, buf.String(), " logger_test.go")
+}
+
+func TestLogger_Error(t *testing.T) {
+ buf := bytes.Buffer{}
+ logger := New("", "", "")
+ logger.formatter.SetOutput(&buf)
+
+ logger.Error()
+ assert.Contains(t, buf.String(), ERROR.ShortString())
+}
+
+func TestLogger_Errorf(t *testing.T) {
+ buf := bytes.Buffer{}
+ logger := New("", "", "")
+ logger.formatter.SetOutput(&buf)
+
+ logger.Errorf("")
+ assert.Contains(t, buf.String(), ERROR.ShortString())
+}
+
+func TestLogger_Warning(t *testing.T) {
+ buf := bytes.Buffer{}
+ logger := New("", "", "")
+ logger.formatter.SetOutput(&buf)
+
+ logger.Warning()
+ assert.Contains(t, buf.String(), WARNING.ShortString())
+}
+
+func TestLogger_Warningf(t *testing.T) {
+ buf := bytes.Buffer{}
+ logger := New("", "", "")
+ logger.formatter.SetOutput(&buf)
+
+ logger.Warningf("")
+ assert.Contains(t, buf.String(), WARNING.ShortString())
+}
+
+func TestLogger_Info(t *testing.T) {
+ buf := bytes.Buffer{}
+ logger := New("", "", "")
+ logger.formatter.SetOutput(&buf)
+
+ logger.Info()
+ assert.Contains(t, buf.String(), INFO.ShortString())
+}
+
+func TestLogger_Infof(t *testing.T) {
+ buf := bytes.Buffer{}
+ logger := New("", "", "")
+ logger.formatter.SetOutput(&buf)
+
+ logger.Infof("")
+ assert.Contains(t, buf.String(), INFO.ShortString())
+}
+
+func TestLogger_Debug(t *testing.T) {
+ buf := bytes.Buffer{}
+ logger := New("", "", "")
+ logger.formatter.SetOutput(&buf)
+
+ logger.Debug()
+ assert.Contains(t, buf.String(), DEBUG.ShortString())
+}
+
+func TestLogger_Debugf(t *testing.T) {
+ buf := bytes.Buffer{}
+ logger := New("", "", "")
+ logger.formatter.SetOutput(&buf)
+
+ logger.Debugf("")
+ assert.Contains(t, buf.String(), DEBUG.ShortString())
+}
+
+func TestLogger_NotInitialized(t *testing.T) {
+ var logger Logger
+ f := func() {
+ logger.Info()
+ }
+ assert.NotPanics(t, f)
+}
+
+func TestLogger_NotInitializedPtr(t *testing.T) {
+ var logger *Logger
+ f := func() {
+ logger.Info()
+ }
+ assert.NotPanics(t, f)
+}
+
+func TestLogger_Unlimited(t *testing.T) {
+ logger := New("", "", "")
+
+ wr := countWriter(0)
+ logger.formatter.SetOutput(&wr)
+
+ num := 1000
+
+ for i := 0; i < num; i++ {
+ logger.Info()
+ }
+
+ require.Equal(t, num, int(wr))
+}
+
+func TestLogger_Limited(t *testing.T) {
+ SetSeverity(INFO)
+
+ logger := New("", "", "")
+ logger.limited = true
+
+ wr := countWriter(0)
+ logger.formatter.SetOutput(&wr)
+
+ num := 1000
+
+ for i := 0; i < num; i++ {
+ logger.Info()
+ }
+
+ require.Equal(t, msgPerSecondLimit, int(wr))
+}
+
+func TestLogger_Info_race(t *testing.T) {
+ logger := New("", "", "")
+ logger.formatter.SetOutput(ioutil.Discard)
+ for i := 0; i < 10; i++ {
+ go func() {
+ for j := 0; j < 10; j++ {
+ logger.Info("hello ", "world")
+ }
+ }()
+ }
+ time.Sleep(time.Second)
+}
+
+type countWriter int
+
+func (c *countWriter) Write(b []byte) (n int, err error) {
+ *c++
+ return len(b), nil
+}
+
+func BenchmarkLogger_Infof(b *testing.B) {
+ l := New("test", "test", "test")
+ l.formatter.SetOutput(ioutil.Discard)
+ for i := 0; i < b.N; i++ {
+ l.Infof("hello %s", "world")
+ }
+}
+
+func BenchmarkLog_Printf(b *testing.B) {
+ logger := log.New(ioutil.Discard, "", log.Lshortfile)
+ for i := 0; i < b.N; i++ {
+ logger.Printf("hello %s", "world")
+ }
+}
diff --git a/logger/severity.go b/logger/severity.go
new file mode 100644
index 0000000..d8d2f6e
--- /dev/null
+++ b/logger/severity.go
@@ -0,0 +1,53 @@
+package logger
+
+var globalSeverity = INFO
+
+// Severity is a logging severity level
+type Severity int
+
+const (
+ // CRITICAL severity level
+ CRITICAL Severity = iota
+ // ERROR severity level
+ ERROR
+ // WARNING severity level
+ WARNING
+ // INFO severity level
+ INFO
+ // DEBUG severity level
+ DEBUG
+)
+
+// String returns human readable string
+func (s Severity) String() string {
+ switch s {
+ case CRITICAL:
+ return "CRITICAL"
+ case ERROR:
+ return "ERROR"
+ case WARNING:
+ return "WARNING"
+ case INFO:
+ return "INFO"
+ case DEBUG:
+ return "DEBUG"
+ }
+ return "UNKNOWN"
+}
+
+// ShortString returns human readable short string
+func (s Severity) ShortString() string {
+ switch s {
+ case CRITICAL:
+ return "CRIT"
+ case ERROR:
+ return "ERROR"
+ case WARNING:
+ return "WARN"
+ case INFO:
+ return "INFO"
+ case DEBUG:
+ return "DEBUG"
+ }
+ return "UNKNOWN"
+}
diff --git a/logger/static.go b/logger/static.go
new file mode 100644
index 0000000..b7665e9
--- /dev/null
+++ b/logger/static.go
@@ -0,0 +1,99 @@
+package logger
+
+import "fmt"
+
+// Panic logs a message with the Critical severity then panic
+func Panic(a ...interface{}) {
+ s := fmt.Sprint(a...)
+ base.output(CRITICAL, 1, s)
+ panic(s)
+}
+
+// Critical logs a message with the Critical severity
+func Critical(a ...interface{}) {
+ base.output(CRITICAL, 1, fmt.Sprint(a...))
+}
+
+// Error logs a message with the Error severity
+func Error(a ...interface{}) {
+ base.output(ERROR, 1, fmt.Sprint(a...))
+}
+
+// Warning logs a message with the Warning severity
+func Warning(a ...interface{}) {
+ base.output(WARNING, 1, fmt.Sprint(a...))
+}
+
+// Info logs a message with the Info severity
+func Info(a ...interface{}) {
+ base.output(INFO, 1, fmt.Sprint(a...))
+}
+
+// Debug logs a message with the Debug severity
+func Debug(a ...interface{}) {
+ base.output(DEBUG, 1, fmt.Sprint(a...))
+}
+
+// Panicln logs a message with the Critical severity then panic
+func Panicln(a ...interface{}) {
+ s := fmt.Sprintln(a...)
+ base.output(CRITICAL, 1, s)
+ panic(s)
+}
+
+// Criticalln logs a message with the Critical severity
+func Criticalln(a ...interface{}) {
+ base.output(CRITICAL, 1, fmt.Sprintln(a...))
+}
+
+// Errorln logs a message with the Error severity
+func Errorln(a ...interface{}) {
+ base.output(ERROR, 1, fmt.Sprintln(a...))
+}
+
+// Warningln logs a message with the Warning severity
+func Warningln(a ...interface{}) {
+ base.output(WARNING, 1, fmt.Sprintln(a...))
+}
+
+// Infoln logs a message with the Info severity
+func Infoln(a ...interface{}) {
+ base.output(INFO, 1, fmt.Sprintln(a...))
+}
+
+// Debugln logs a message with the Debug severity
+func Debugln(a ...interface{}) {
+ base.output(DEBUG, 1, fmt.Sprintln(a...))
+}
+
+// Panicf logs a message with the Critical severity using the same syntax and options as fmt.Printf then panic
+func Panicf(format string, a ...interface{}) {
+ s := fmt.Sprintf(format, a...)
+ base.output(CRITICAL, 1, s)
+ panic(s)
+}
+
+// Criticalf logs a message with the Critical severity using the same syntax and options as fmt.Printf
+func Criticalf(format string, a ...interface{}) {
+ base.output(CRITICAL, 1, fmt.Sprintf(format, a...))
+}
+
+// Errorf logs a message with the Error severity using the same syntax and options as fmt.Printf
+func Errorf(format string, a ...interface{}) {
+ base.output(ERROR, 1, fmt.Sprintf(format, a...))
+}
+
+// Warningf logs a message with the Warning severity using the same syntax and options as fmt.Printf
+func Warningf(format string, a ...interface{}) {
+ base.output(WARNING, 1, fmt.Sprintf(format, a...))
+}
+
+// Infof logs a message with the Info severity using the same syntax and options as fmt.Printf
+func Infof(format string, a ...interface{}) {
+ base.output(INFO, 1, fmt.Sprintf(format, a...))
+}
+
+// Debugf logs a message with the Debug severity using the same syntax and options as fmt.Printf
+func Debugf(format string, a ...interface{}) {
+ base.output(DEBUG, 1, fmt.Sprintf(format, a...))
+}
diff --git a/mock.go b/mock.go
new file mode 100644
index 0000000..aa849fc
--- /dev/null
+++ b/mock.go
@@ -0,0 +1,94 @@
+package orchestrator
+
+type mockJob struct {
+ fullName func() string
+ moduleName func() string
+ name func() string
+ autodetection func() bool
+ autodetectionEvery func() int
+ retryAutodetection func() bool
+ panicked func() bool
+ tick func(int)
+ start func()
+ stop func()
+}
+
+// FullName returns mock job full name.
+func (m mockJob) FullName() string {
+ if m.fullName == nil {
+ return "mock"
+ }
+ return m.fullName()
+}
+
+// ModuleName returns mock job module name.
+func (m mockJob) ModuleName() string {
+ if m.moduleName == nil {
+ return "mock"
+ }
+ return m.moduleName()
+}
+
+// Name returns mock job name.
+func (m mockJob) Name() string {
+ if m.name == nil {
+ return "mock"
+ }
+ return m.name()
+}
+
+// AutoDetectionEvery returns mock job AutoDetectionEvery.
+func (m mockJob) AutoDetectionEvery() int {
+ if m.autodetectionEvery == nil {
+ return 0
+ }
+ return m.autodetectionEvery()
+}
+
+// AutoDetection returns mock job AutoDetection.
+func (m mockJob) AutoDetection() bool {
+ if m.autodetection == nil {
+ return true
+ }
+ return m.autodetection()
+}
+
+// RetryAutoDetection invokes mock job RetryAutoDetection.
+func (m mockJob) RetryAutoDetection() bool {
+ if m.retryAutodetection == nil {
+ return true
+ }
+ return m.retryAutodetection()
+}
+
+// Panicked return whether the mock job is panicked.
+func (m mockJob) Panicked() bool {
+ if m.panicked == nil {
+ return false
+ }
+ return m.panicked()
+}
+
+// Tick invokes mock job Tick.
+func (m mockJob) Tick(clock int) {
+ if m.tick == nil {
+ return
+ }
+ m.tick(clock)
+}
+
+// Start invokes mock job Start.
+func (m mockJob) Start() {
+ if m.start == nil {
+ return
+ }
+ m.start()
+}
+
+// Stop invokes mock job Stop.
+func (m mockJob) Stop() {
+ if m.stop == nil {
+ return
+ }
+ m.stop()
+}
diff --git a/mock_test.go b/mock_test.go
new file mode 100644
index 0000000..779426b
--- /dev/null
+++ b/mock_test.go
@@ -0,0 +1,87 @@
+package orchestrator
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestMockJob_FullName(t *testing.T) {
+ m := &mockJob{}
+ expected := "name"
+
+ assert.NotEqual(t, expected, m.FullName())
+ m.fullName = func() string { return expected }
+ assert.Equal(t, expected, m.FullName())
+}
+
+func TestMockJob_ModuleName(t *testing.T) {
+ m := &mockJob{}
+ expected := "name"
+
+ assert.NotEqual(t, expected, m.ModuleName())
+ m.moduleName = func() string { return expected }
+ assert.Equal(t, expected, m.ModuleName())
+}
+
+func TestMockJob_Name(t *testing.T) {
+ m := &mockJob{}
+ expected := "name"
+
+ assert.NotEqual(t, expected, m.Name())
+ m.name = func() string { return expected }
+ assert.Equal(t, expected, m.Name())
+}
+
+func TestMockJob_AutoDetectionEvery(t *testing.T) {
+ m := &mockJob{}
+ expected := -1
+
+ assert.NotEqual(t, expected, m.AutoDetectionEvery())
+ m.autodetectionEvery = func() int { return expected }
+ assert.Equal(t, expected, m.AutoDetectionEvery())
+}
+
+func TestMockJob_RetryAutoDetection(t *testing.T) {
+ m := &mockJob{}
+ expected := true
+
+ assert.True(t, m.RetryAutoDetection())
+ m.retryAutodetection = func() bool { return expected }
+ assert.True(t, m.RetryAutoDetection())
+}
+
+func TestMockJob_AutoDetection(t *testing.T) {
+ m := &mockJob{}
+ expected := true
+
+ assert.True(t, m.AutoDetection())
+ m.autodetection = func() bool { return expected }
+ assert.True(t, m.AutoDetection())
+}
+
+func TestMockJob_Panicked(t *testing.T) {
+ m := &mockJob{}
+
+ assert.False(t, m.Panicked())
+ m.panicked = func() bool { return true }
+ assert.True(t, m.Panicked())
+}
+
+func TestMockJob_Tick(t *testing.T) {
+ m := &mockJob{}
+
+ assert.NotPanics(t, func() { m.Tick(1) })
+}
+
+func TestMockJob_Start(t *testing.T) {
+ m := &mockJob{}
+
+ assert.NotPanics(t, func() { m.Start() })
+}
+
+func TestMockJob_Stop(t *testing.T) {
+ m := &mockJob{}
+
+ assert.NotPanics(t, func() { m.Stop() })
+}
diff --git a/module/charts.go b/module/charts.go
new file mode 100644
index 0000000..50ec46e
--- /dev/null
+++ b/module/charts.go
@@ -0,0 +1,442 @@
+package module
+
+import (
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+ "unicode"
+)
+
+type (
+ chartType string
+ dimAlgo string
+ dimDivMul int
+)
+
+const (
+ // Line chart type.
+ Line chartType = "line"
+ // Area chart type.
+ Area chartType = "area"
+ // Stacked chart type.
+ Stacked chartType = "stacked"
+
+ // Absolute dimension algorithm.
+ // The value is to drawn as-is (interpolated to second boundary).
+ Absolute dimAlgo = "absolute"
+ // Incremental dimension algorithm.
+ // The value increases over time, the difference from the last value is presented in the chart,
+ // the server interpolates the value and calculates a per second figure.
+ Incremental dimAlgo = "incremental"
+ // PercentOfAbsolute dimension algorithm.
+ // The percent of this value compared to the total of all dimensions.
+ PercentOfAbsolute dimAlgo = "percentage-of-absolute-row"
+ // PercentOfIncremental dimension algorithm.
+ // The percent of this value compared to the incremental total of all dimensions
+ PercentOfIncremental dimAlgo = "percentage-of-incremental-row"
+)
+
+func (d dimAlgo) String() string {
+ switch d {
+ case Absolute, Incremental, PercentOfAbsolute, PercentOfIncremental:
+ return string(d)
+ }
+ return ""
+}
+
+func (c chartType) String() string {
+ switch c {
+ case Line, Area, Stacked:
+ return string(c)
+ }
+ return ""
+}
+
+func (d dimDivMul) String() string {
+ if d != 0 {
+ return strconv.Itoa(int(d))
+ }
+ return ""
+}
+
+type (
+ // Charts is a collection of Charts.
+ Charts []*Chart
+
+ // Opts represents chart options.
+ Opts struct {
+ Obsolete bool
+ Detail bool
+ StoreFirst bool
+ Hidden bool
+ }
+
+ // Chart represents a chart.
+ // For the full description please visit https://docs.netdata.cloud/collectors/plugins.d/#chart
+ Chart struct {
+ // typeID is the unique identification of the chart, if not specified,
+ // the orchestrator will use job full name + chart ID as typeID (default behaviour).
+ typeID string
+
+ ID string
+ OverID string
+ Title string
+ Units string
+ Fam string
+ Ctx string
+ Type chartType
+ Priority int
+ Opts
+
+ Dims Dims
+ Vars Vars
+
+ Retries int
+
+ remove bool
+ // created flag is used to indicate whether the chart needs to be created by the orchestrator.
+ created bool
+ // updated flag is used to indicate whether the chart was updated on last data collection interval.
+ updated bool
+ }
+
+ // DimOpts represents dimension options.
+ DimOpts struct {
+ Obsolete bool
+ Hidden bool
+ NoReset bool
+ NoOverflow bool
+ }
+
+ // Dim represents a chart dimension.
+ // For detailed description please visit https://docs.netdata.cloud/collectors/plugins.d/#dimension.
+ Dim struct {
+ ID string
+ Name string
+ Algo dimAlgo
+ Mul dimDivMul
+ Div dimDivMul
+ DimOpts
+
+ remove bool
+ }
+
+ // Var represents a chart variable.
+ // For detailed description please visit https://docs.netdata.cloud/collectors/plugins.d/#variable
+ Var struct {
+ ID string
+ Value int64
+ }
+
+ // Dims is a collection of dims.
+ Dims []*Dim
+ // Vars is a collection of vars.
+ Vars []*Var
+)
+
+func (o Opts) String() string {
+ var opts []string
+
+ if o.Detail {
+ opts = append(opts, "detail")
+ }
+ if o.Hidden {
+ opts = append(opts, "hidden")
+ }
+ if o.Obsolete {
+ opts = append(opts, "obsolete")
+ }
+ if o.StoreFirst {
+ opts = append(opts, "store_first")
+ }
+
+ return strings.Join(opts, " ")
+}
+
+func (o DimOpts) String() string {
+ var opts []string
+
+ if o.Hidden {
+ opts = append(opts, "hidden")
+ }
+ if o.NoOverflow {
+ opts = append(opts, "nooverflow")
+ }
+ if o.NoReset {
+ opts = append(opts, "noreset")
+ }
+ if o.Obsolete {
+ opts = append(opts, "obsolete")
+ }
+
+ return strings.Join(opts, " ")
+}
+
+// Add adds (appends) a variable number of Charts.
+func (c *Charts) Add(charts ...*Chart) error {
+ for _, chart := range charts {
+ err := checkChart(chart)
+ if err != nil {
+ return fmt.Errorf("error on adding chart : %s", err)
+ }
+ if c.Has(chart.ID) {
+ return fmt.Errorf("error on adding chart : '%s' is already in charts", chart.ID)
+ }
+ *c = append(*c, chart)
+ }
+
+ return nil
+}
+
+// Get returns the chart by ID.
+func (c Charts) Get(chartID string) *Chart {
+ idx := c.index(chartID)
+ if idx == -1 {
+ return nil
+ }
+ return c[idx]
+}
+
+// Has returns true if ChartsFunc contain the chart with the given ID, false otherwise.
+func (c Charts) Has(chartID string) bool {
+ return c.index(chartID) != -1
+}
+
+// Remove removes the chart from Charts by ID.
+// Avoid to use it in runtime.
+func (c *Charts) Remove(chartID string) error {
+ idx := c.index(chartID)
+ if idx == -1 {
+ return fmt.Errorf("error on removing chart : '%s' is not in charts", chartID)
+ }
+ copy((*c)[idx:], (*c)[idx+1:])
+ (*c)[len(*c)-1] = nil
+ *c = (*c)[:len(*c)-1]
+ return nil
+}
+
+// Copy returns a deep copy of ChartsFunc.
+func (c Charts) Copy() *Charts {
+ charts := Charts{}
+ for idx := range c {
+ charts = append(charts, c[idx].Copy())
+ }
+ return &charts
+}
+
+func (c Charts) index(chartID string) int {
+ for idx := range c {
+ if c[idx].ID == chartID {
+ return idx
+ }
+ }
+ return -1
+}
+
+// MarkNotCreated changes 'created' chart flag to false.
+// Use it to add dimension in runtime.
+func (c *Chart) MarkNotCreated() {
+ c.created = false
+}
+
+// MarkRemove sets 'remove' flag and Obsolete option to true.
+// Use it to remove chart in runtime.
+func (c *Chart) MarkRemove() {
+ c.Obsolete = true
+ c.remove = true
+}
+
+// MarkDimRemove sets 'remove' flag, Obsolete and optionally Hidden options to true.
+// Use it to remove dimension in runtime.
+func (c *Chart) MarkDimRemove(dimID string, hide bool) error {
+ if !c.HasDim(dimID) {
+ return fmt.Errorf("chart '%s' has no '%s' dimension", c.ID, dimID)
+ }
+ dim := c.GetDim(dimID)
+ dim.Obsolete = true
+ if hide {
+ dim.Hidden = true
+ }
+ dim.remove = true
+ return nil
+}
+
+// AddDim adds new dimension to the chart dimensions.
+func (c *Chart) AddDim(newDim *Dim) error {
+ err := checkDim(newDim)
+ if err != nil {
+ return fmt.Errorf("error on adding dim to chart '%s' : %s", c.ID, err)
+ }
+ if c.HasDim(newDim.ID) {
+ return fmt.Errorf("error on adding dim : '%s' is already in chart '%s' dims", newDim.ID, c.ID)
+ }
+ c.Dims = append(c.Dims, newDim)
+
+ return nil
+}
+
+// AddVar adds new variable to the chart variables.
+func (c *Chart) AddVar(newVar *Var) error {
+ err := checkVar(newVar)
+ if err != nil {
+ return fmt.Errorf("error on adding var to chart '%s' : %s", c.ID, err)
+ }
+ if c.indexVar(newVar.ID) != -1 {
+ return fmt.Errorf("error on adding var : '%s' is already in chart '%s' vars", newVar.ID, c.ID)
+ }
+ c.Vars = append(c.Vars, newVar)
+
+ return nil
+}
+
+// GetDim returns dimension by ID.
+func (c *Chart) GetDim(dimID string) *Dim {
+ idx := c.indexDim(dimID)
+ if idx == -1 {
+ return nil
+ }
+ return c.Dims[idx]
+}
+
+// RemoveDim removes dimension by ID.
+// Avoid to use it in runtime.
+func (c *Chart) RemoveDim(dimID string) error {
+ idx := c.indexDim(dimID)
+ if idx == -1 {
+ return fmt.Errorf("error on removing dim : '%s' isn't in chart '%s'", dimID, c.ID)
+ }
+ c.Dims = append(c.Dims[:idx], c.Dims[idx+1:]...)
+
+ return nil
+}
+
+// HasDim returns true if the chart contains dimension with the given ID, false otherwise.
+func (c Chart) HasDim(dimID string) bool {
+ return c.indexDim(dimID) != -1
+}
+
+// Copy returns a deep copy of the chart.
+func (c Chart) Copy() *Chart {
+ chart := c
+ chart.Dims = Dims{}
+ chart.Vars = Vars{}
+
+ for idx := range c.Dims {
+ chart.Dims = append(chart.Dims, c.Dims[idx].copy())
+ }
+ for idx := range c.Vars {
+ chart.Vars = append(chart.Vars, c.Vars[idx].copy())
+ }
+
+ return &chart
+}
+
+func (c Chart) indexDim(dimID string) int {
+ for idx := range c.Dims {
+ if c.Dims[idx].ID == dimID {
+ return idx
+ }
+ }
+ return -1
+}
+
+func (c Chart) indexVar(varID string) int {
+ for idx := range c.Vars {
+ if c.Vars[idx].ID == varID {
+ return idx
+ }
+ }
+ return -1
+}
+
+func (d Dim) copy() *Dim {
+ return &d
+}
+
+func (v Var) copy() *Var {
+ return &v
+}
+
+func checkCharts(charts ...*Chart) error {
+ for _, chart := range charts {
+ err := checkChart(chart)
+ if err != nil {
+ return fmt.Errorf("chart '%s' : %v", chart.ID, err)
+ }
+ }
+ return nil
+}
+
+func checkChart(chart *Chart) error {
+ if chart.ID == "" {
+ return errors.New("empty ID")
+ }
+
+ if chart.Title == "" {
+ return errors.New("empty Title")
+ }
+
+ if chart.Units == "" {
+ return errors.New("empty Units")
+ }
+
+ if id := checkID(chart.ID); id != -1 {
+ return fmt.Errorf("unacceptable symbol in ID : '%s'", string(id))
+ }
+
+ set := make(map[string]bool)
+
+ for _, d := range chart.Dims {
+ err := checkDim(d)
+ if err != nil {
+ return err
+ }
+ if set[d.ID] {
+ return fmt.Errorf("duplicate dim '%s'", d.ID)
+ }
+ set[d.ID] = true
+ }
+
+ set = make(map[string]bool)
+
+ for _, v := range chart.Vars {
+ if err := checkVar(v); err != nil {
+ return err
+ }
+ if set[v.ID] {
+ return fmt.Errorf("duplicate var '%s'", v.ID)
+ }
+ set[v.ID] = true
+ }
+ return nil
+}
+
+func checkDim(d *Dim) error {
+ if d.ID == "" {
+ return errors.New("empty dim ID")
+ }
+ if id := checkID(d.ID); id != -1 {
+ return fmt.Errorf("unacceptable symbol in dim ID '%s' : '%s'", d.ID, string(id))
+ }
+ return nil
+}
+
+func checkVar(v *Var) error {
+ if v.ID == "" {
+ return errors.New("empty var ID")
+ }
+ if id := checkID(v.ID); id != -1 {
+ return fmt.Errorf("unacceptable symbol in var ID '%s' : '%s'", v.ID, string(id))
+ }
+ return nil
+}
+
+func checkID(id string) int {
+ for _, r := range id {
+ if unicode.IsSpace(r) {
+ return int(r)
+ }
+ }
+ return -1
+}
diff --git a/module/charts_test.go b/module/charts_test.go
new file mode 100644
index 0000000..7712000
--- /dev/null
+++ b/module/charts_test.go
@@ -0,0 +1,374 @@
+package module
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func createTestChart(id string) *Chart {
+ return &Chart{
+ ID: id,
+ Title: "Title",
+ Units: "units",
+ Fam: "family",
+ Ctx: "context",
+ Type: Line,
+ Dims: Dims{
+ {ID: "dim1", Algo: Absolute},
+ },
+ Vars: Vars{
+ {ID: "var1", Value: 1},
+ },
+ }
+}
+
+func TestDimAlgo_String(t *testing.T) {
+ cases := []struct {
+ expected string
+ actual fmt.Stringer
+ }{
+ {"line", Line},
+ {"area", Area},
+ {"stacked", Stacked},
+ {"", dimAlgo("wrong")},
+ }
+
+ for _, v := range cases {
+ assert.Equal(t, v.expected, v.actual.String())
+ }
+}
+
+func TestChartType_String(t *testing.T) {
+ cases := []struct {
+ expected string
+ actual fmt.Stringer
+ }{
+ {"absolute", Absolute},
+ {"incremental", Incremental},
+ {"percentage-of-absolute-row", PercentOfAbsolute},
+ {"percentage-of-incremental-row", PercentOfIncremental},
+ {"", chartType("wrong")},
+ }
+
+ for _, v := range cases {
+ assert.Equal(t, v.expected, v.actual.String())
+ }
+}
+
+func TestDimDivMul_String(t *testing.T) {
+ cases := []struct {
+ expected string
+ actual fmt.Stringer
+ }{
+ {"", dimDivMul(0)},
+ {"1", dimDivMul(1)},
+ {"-1", dimDivMul(-1)},
+ }
+
+ for _, v := range cases {
+ assert.Equal(t, v.expected, v.actual.String())
+ }
+}
+
+func TestOpts_String(t *testing.T) {
+ cases := []struct {
+ expected string
+ actual fmt.Stringer
+ }{
+ {"", Opts{}},
+ {
+ "detail hidden obsolete store_first",
+ Opts{Detail: true, Hidden: true, Obsolete: true, StoreFirst: true},
+ },
+ {
+ "detail hidden obsolete store_first",
+ Opts{Detail: true, Hidden: true, Obsolete: true, StoreFirst: true},
+ },
+ }
+
+ for _, v := range cases {
+ assert.Equal(t, v.expected, v.actual.String())
+ }
+}
+
+func TestDimOpts_String(t *testing.T) {
+ cases := []struct {
+ expected string
+ actual fmt.Stringer
+ }{
+ {"", DimOpts{}},
+ {
+ "hidden nooverflow noreset obsolete",
+ DimOpts{Hidden: true, NoOverflow: true, NoReset: true, Obsolete: true},
+ },
+ {
+ "hidden obsolete",
+ DimOpts{Hidden: true, NoOverflow: false, NoReset: false, Obsolete: true},
+ },
+ }
+
+ for _, v := range cases {
+ assert.Equal(t, v.expected, v.actual.String())
+ }
+}
+
+func TestCharts_Copy(t *testing.T) {
+ orig := &Charts{
+ createTestChart("1"),
+ createTestChart("2"),
+ }
+ copied := orig.Copy()
+
+ require.False(t, orig == copied, "Charts copy points to the same address")
+ require.Len(t, *orig, len(*copied))
+
+ for idx := range *orig {
+ compareCharts(t, (*orig)[idx], (*copied)[idx])
+ }
+}
+
+func TestChart_Copy(t *testing.T) {
+ orig := createTestChart("1")
+
+ compareCharts(t, orig, orig.Copy())
+}
+
+func TestCharts_Add(t *testing.T) {
+ charts := Charts{}
+ chart1 := createTestChart("1")
+ chart2 := createTestChart("2")
+ chart3 := createTestChart("")
+
+ // OK case
+ assert.NoError(t, charts.Add(
+ chart1,
+ chart2,
+ ))
+ assert.Len(t, charts, 2)
+
+ // NG case
+ assert.Error(t, charts.Add(
+ chart3,
+ chart1,
+ chart2,
+ ))
+ assert.Len(t, charts, 2)
+
+ assert.True(t, charts[0] == chart1)
+ assert.True(t, charts[1] == chart2)
+}
+
+func TestCharts_Get(t *testing.T) {
+ chart := createTestChart("1")
+ charts := Charts{
+ chart,
+ }
+
+ // OK case
+ assert.True(t, chart == charts.Get("1"))
+ // NG case
+ assert.Nil(t, charts.Get("2"))
+}
+
+func TestCharts_Has(t *testing.T) {
+ chart := createTestChart("1")
+ charts := &Charts{
+ chart,
+ }
+
+ // OK case
+ assert.True(t, charts.Has("1"))
+ // NG case
+ assert.False(t, charts.Has("2"))
+}
+
+func TestCharts_Remove(t *testing.T) {
+ chart := createTestChart("1")
+ charts := &Charts{
+ chart,
+ }
+
+ // OK case
+ assert.NoError(t, charts.Remove("1"))
+ assert.Len(t, *charts, 0)
+
+ // NG case
+ assert.Error(t, charts.Remove("2"))
+}
+
+func TestChart_AddDim(t *testing.T) {
+ chart := createTestChart("1")
+ dim := &Dim{ID: "dim2"}
+
+ // OK case
+ assert.NoError(t, chart.AddDim(dim))
+ assert.Len(t, chart.Dims, 2)
+
+ // NG case
+ assert.Error(t, chart.AddDim(dim))
+ assert.Len(t, chart.Dims, 2)
+}
+
+func TestChart_AddVar(t *testing.T) {
+ chart := createTestChart("1")
+ variable := &Var{ID: "var2"}
+
+ // OK case
+ assert.NoError(t, chart.AddVar(variable))
+ assert.Len(t, chart.Vars, 2)
+
+ // NG case
+ assert.Error(t, chart.AddVar(variable))
+ assert.Len(t, chart.Vars, 2)
+}
+
+func TestChart_GetDim(t *testing.T) {
+ chart := &Chart{
+ Dims: Dims{
+ {ID: "1"},
+ {ID: "2"},
+ },
+ }
+
+ // OK case
+ assert.True(t, chart.GetDim("1") != nil && chart.GetDim("1").ID == "1")
+
+ // NG case
+ assert.Nil(t, chart.GetDim("3"))
+}
+
+func TestChart_RemoveDim(t *testing.T) {
+ chart := createTestChart("1")
+
+ // OK case
+ assert.NoError(t, chart.RemoveDim("dim1"))
+ assert.Len(t, chart.Dims, 0)
+
+ // NG case
+ assert.Error(t, chart.RemoveDim("dim2"))
+}
+
+func TestChart_HasDim(t *testing.T) {
+ chart := createTestChart("1")
+
+ // OK case
+ assert.True(t, chart.HasDim("dim1"))
+ // NG case
+ assert.False(t, chart.HasDim("dim2"))
+}
+
+func TestChart_MarkNotCreated(t *testing.T) {
+ chart := createTestChart("1")
+
+ chart.MarkNotCreated()
+ assert.False(t, chart.created)
+}
+
+func TestChart_MarkRemove(t *testing.T) {
+ chart := createTestChart("1")
+
+ chart.MarkRemove()
+ assert.True(t, chart.remove)
+ assert.True(t, chart.Obsolete)
+}
+
+func TestChart_MarkDimRemove(t *testing.T) {
+ chart := createTestChart("1")
+
+ assert.Error(t, chart.MarkDimRemove("dim99", false))
+ assert.NoError(t, chart.MarkDimRemove("dim1", true))
+ assert.True(t, chart.GetDim("dim1").Obsolete)
+ assert.True(t, chart.GetDim("dim1").Hidden)
+ assert.True(t, chart.GetDim("dim1").remove)
+}
+
+func TestChart_check(t *testing.T) {
+ // OK case
+ chart := createTestChart("1")
+ assert.NoError(t, checkChart(chart))
+
+ // NG case
+ chart = createTestChart("1")
+ chart.ID = ""
+ assert.Error(t, checkChart(chart))
+
+ chart = createTestChart("1")
+ chart.ID = "invalid id"
+ assert.Error(t, checkChart(chart))
+
+ chart = createTestChart("1")
+ chart.Title = ""
+ assert.Error(t, checkChart(chart))
+
+ chart = createTestChart("1")
+ chart.Units = ""
+ assert.Error(t, checkChart(chart))
+
+ chart = createTestChart("1")
+ chart.Dims = Dims{
+ {ID: "1"},
+ {ID: "1"},
+ }
+ assert.Error(t, checkChart(chart))
+
+ chart = createTestChart("1")
+ chart.Vars = Vars{
+ {ID: "1"},
+ {ID: "1"},
+ }
+ assert.Error(t, checkChart(chart))
+}
+
+func TestDim_check(t *testing.T) {
+ // OK case
+ dim := &Dim{ID: "id"}
+ assert.NoError(t, checkDim(dim))
+
+ // NG case
+ dim = &Dim{ID: "id"}
+ dim.ID = ""
+ assert.Error(t, checkDim(dim))
+
+ dim = &Dim{ID: "id"}
+ dim.ID = "invalid id"
+ assert.Error(t, checkDim(dim))
+}
+
+func TestVar_check(t *testing.T) {
+ // OK case
+ v := &Var{ID: "id"}
+ assert.NoError(t, checkVar(v))
+
+ // NG case
+ v = &Var{ID: "id"}
+ v.ID = ""
+ assert.Error(t, checkVar(v))
+
+ v = &Var{ID: "id"}
+ v.ID = "invalid id"
+ assert.Error(t, checkVar(v))
+}
+
+func compareCharts(t *testing.T, orig, copied *Chart) {
+ // 1. compare chart pointers
+ // 2. compare Dims, Vars length
+ // 3. compare Dims, Vars pointers
+
+ assert.False(t, orig == copied, "Chart copy ChartsFunc points to the same address")
+
+ require.Len(t, orig.Dims, len(copied.Dims))
+ require.Len(t, orig.Vars, len(copied.Vars))
+
+ for idx := range (*orig).Dims {
+ assert.False(t, orig.Dims[idx] == copied.Dims[idx], "Chart copy dim points to the same address")
+ assert.Equal(t, orig.Dims[idx], copied.Dims[idx], "Chart copy dim isn't equal to orig")
+ }
+
+ for idx := range (*orig).Vars {
+ assert.False(t, orig.Vars[idx] == copied.Vars[idx], "Chart copy var points to the same address")
+ assert.Equal(t, orig.Vars[idx], copied.Vars[idx], "Chart copy var isn't equal to orig")
+ }
+}
diff --git a/module/job.go b/module/job.go
new file mode 100644
index 0000000..6bc51a1
--- /dev/null
+++ b/module/job.go
@@ -0,0 +1,423 @@
+package module
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/netdata/go-orchestrator/logger"
+)
+
+const (
+ penaltyStep = 5
+ maxPenalty = 600
+ infTries = -1
+)
+
+// NewJob returns new job.
+func NewJob(pluginName string, moduleName string, module Module, out io.Writer) *Job {
+ runtimeChart := &Chart{
+ typeID: "netdata",
+ Units: "ms",
+ Fam: pluginName,
+ Ctx: "netdata.go_plugin_execution_time", Priority: 145000,
+ Dims: Dims{
+ {ID: "time"},
+ },
+ }
+ buf := &bytes.Buffer{}
+
+ return &Job{
+ AutoDetectTries: infTries,
+ pluginName: pluginName,
+ moduleName: moduleName,
+ module: module,
+ runtimeChart: runtimeChart,
+ out: out,
+ stopHook: make(chan struct{}, 1),
+ tick: make(chan int),
+ buf: buf,
+ apiWriter: apiWriter{Writer: buf},
+ }
+}
+
+// Job represents a job. It's a module wrapper.
+type Job struct {
+ Nam string `yaml:"name"`
+ UpdateEvery int `yaml:"update_every"`
+ AutoDetectEvery int `yaml:"autodetection_retry"`
+ AutoDetectTries int `yaml:"-"`
+ Priority int `yaml:"priority"`
+
+ *logger.Logger
+
+ pluginName string
+ moduleName string
+ module Module
+
+ initialized bool
+ panicked bool
+
+ stopHook chan struct{}
+ runtimeChart *Chart
+ charts *Charts
+ tick chan int
+ out io.Writer
+ buf *bytes.Buffer
+ apiWriter apiWriter
+
+ retries int
+ prevRun time.Time
+}
+
+// FullName returns full name.
+// If name isn't specified or equal to module name it returns module name.
+func (j Job) FullName() string {
+ if j.Nam == "" || j.Nam == j.moduleName {
+ return j.ModuleName()
+ }
+ return fmt.Sprintf("%s_%s", j.ModuleName(), j.Name())
+}
+
+// ModuleName returns module name.
+func (j Job) ModuleName() string {
+ return j.moduleName
+}
+
+// Name returns name.
+// If name isn't specified it returns module name.
+func (j Job) Name() string {
+ if j.Nam == "" {
+ return j.moduleName
+ }
+ return j.Nam
+}
+
+// Panicked returns 'panicked' flag value.
+func (j Job) Panicked() bool {
+ return j.panicked
+}
+
+func (j *Job) AutoDetection() (ok bool) {
+ defer func() {
+ if r := recover(); r != nil {
+ ok = false
+ j.Errorf("PANIC %v", r)
+ j.panicked = true
+ j.disableAutodetection()
+ }
+ if !ok {
+ j.module.Cleanup()
+ }
+ }()
+
+ if ok = j.init(); !ok {
+ j.Error("Init failed")
+ j.disableAutodetection()
+ return
+ }
+ if ok = j.check(); !ok {
+ j.Error("Check failed")
+ return
+ }
+ if ok = j.postCheck(); !ok {
+ j.Error("PostCheck failed")
+ j.disableAutodetection()
+ return
+ }
+ return true
+}
+
+func (j *Job) disableAutodetection() {
+ j.AutoDetectEvery = 0
+}
+
+// Init calls module Init and returns its value.
+func (j *Job) init() bool {
+ if j.initialized {
+ return true
+ }
+
+ limitedLogger := logger.NewLimited(j.pluginName, j.ModuleName(), j.Name())
+ j.Logger = limitedLogger
+ j.module.SetLogger(limitedLogger)
+
+ ok := j.module.Init()
+ if ok {
+ j.initialized = true
+ }
+ return ok
+}
+
+// check calls module check and returns its value.
+func (j *Job) check() bool {
+ ok := j.module.Check()
+ if !ok {
+ if j.AutoDetectTries != infTries {
+ j.AutoDetectTries--
+ }
+ }
+ return ok
+}
+
+// PostCheck calls module Charts.
+func (j *Job) postCheck() bool {
+ j.charts = j.module.Charts()
+ if j.charts == nil {
+ j.Error("Charts can't be nil")
+ return false
+ }
+ if err := checkCharts(*j.charts...); err != nil {
+ j.Errorf("error on checking charts : %v", err)
+ return false
+ }
+ return true
+}
+
+// Tick Tick.
+func (j *Job) Tick(clock int) {
+ select {
+ case j.tick <- clock:
+ default:
+ j.Debug("Skip the tick due to previous run hasn't been finished")
+ }
+}
+
+// Start simply invokes MainLoop.
+func (j *Job) Start() {
+ j.MainLoop()
+}
+
+// Stop stops MainLoop
+func (j *Job) Stop() {
+ j.stopHook <- struct{}{}
+}
+
+// MainLoop is a job main function.
+func (j *Job) MainLoop() {
+LOOP:
+ for {
+ select {
+ case <-j.stopHook:
+ j.module.Cleanup()
+ break LOOP
+ case t := <-j.tick:
+ doRun := t%(j.UpdateEvery+j.penalty()) == 0
+ if doRun {
+ j.runOnce()
+ }
+ }
+ }
+}
+
+func (j *Job) runOnce() {
+ curTime := time.Now()
+ sinceLastRun := calcSinceLastRun(curTime, j.prevRun)
+ j.prevRun = curTime
+
+ metrics := j.collect()
+
+ if j.panicked {
+ return
+ }
+
+ if j.processMetrics(metrics, curTime, sinceLastRun) {
+ j.retries = 0
+ } else {
+ j.retries++
+ }
+
+ _, _ = io.Copy(j.out, j.buf)
+ j.buf.Reset()
+}
+
+// AutoDetectionRetry returns value of AutoDetectEvery.
+func (j Job) AutoDetectionEvery() int {
+ return j.AutoDetectEvery
+}
+
+// ReDoAutoDetection returns whether it is needed to retry autodetection.
+func (j Job) RetryAutoDetection() bool {
+ return j.AutoDetectEvery > 0 && (j.AutoDetectTries == infTries || j.AutoDetectTries > 0)
+}
+
+func (j *Job) collect() (result map[string]int64) {
+ j.panicked = false
+ defer func() {
+ if r := recover(); r != nil {
+ j.Errorf("PANIC: %v", r)
+ j.panicked = true
+ }
+ }()
+ return j.module.Collect()
+}
+
+func (j *Job) processMetrics(metrics map[string]int64, startTime time.Time, sinceLastRun int) bool {
+ if !j.runtimeChart.created {
+ j.runtimeChart.ID = fmt.Sprintf("execution_time_of_%s", j.FullName())
+ j.runtimeChart.Title = fmt.Sprintf("Execution Time for %s", j.FullName())
+ j.createChart(j.runtimeChart)
+ }
+
+ var (
+ remove []string
+ totalUpdated int
+ elapsed = int64(durationTo(time.Now().Sub(startTime), time.Millisecond))
+ )
+
+ for _, chart := range *j.charts {
+ if !chart.created {
+ j.createChart(chart)
+ }
+ if chart.remove {
+ remove = append(remove, chart.ID)
+ continue
+ }
+ if len(metrics) == 0 || chart.Obsolete {
+ continue
+ }
+ if j.updateChart(chart, metrics, sinceLastRun) {
+ totalUpdated++
+ }
+
+ }
+
+ for _, id := range remove {
+ _ = j.charts.Remove(id)
+ }
+
+ if totalUpdated == 0 {
+ return false
+ }
+
+ j.updateChart(j.runtimeChart, map[string]int64{"time": elapsed}, sinceLastRun)
+
+ return true
+}
+
+func (j *Job) createChart(chart *Chart) {
+ if chart.Priority == 0 {
+ chart.Priority = j.Priority
+ j.Priority++
+ }
+ _ = j.apiWriter.chart(
+ firstNotEmpty(chart.typeID, j.FullName()),
+ chart.ID,
+ chart.OverID,
+ chart.Title,
+ chart.Units,
+ chart.Fam,
+ chart.Ctx,
+ chart.Type,
+ chart.Priority,
+ j.UpdateEvery,
+ chart.Opts,
+ j.pluginName,
+ j.moduleName,
+ )
+ for _, dim := range chart.Dims {
+ _ = j.apiWriter.dimension(
+ dim.ID,
+ dim.Name,
+ dim.Algo,
+ dim.Mul,
+ dim.Div,
+ dim.DimOpts,
+ )
+ }
+ for _, v := range chart.Vars {
+ _ = j.apiWriter.varSet(
+ v.ID,
+ v.Value,
+ )
+ }
+ _, _ = j.apiWriter.Write([]byte("\n"))
+
+ chart.created = true
+}
+
+func (j *Job) updateChart(chart *Chart, data map[string]int64, sinceLastRun int) bool {
+ if !chart.updated {
+ sinceLastRun = 0
+ }
+
+ _ = j.apiWriter.begin(
+ firstNotEmpty(chart.typeID, j.FullName()),
+ chart.ID,
+ sinceLastRun,
+ )
+
+ var (
+ remove []string
+ updated int
+ )
+
+ for _, dim := range chart.Dims {
+ if dim.remove {
+ remove = append(remove, dim.ID)
+ continue
+ }
+ v, ok := data[dim.ID]
+
+ if !ok {
+ _ = j.apiWriter.dimSetEmpty(dim.ID)
+ } else {
+ _ = j.apiWriter.dimSet(dim.ID, v)
+ updated++
+ }
+ }
+
+ for _, id := range remove {
+ _ = chart.RemoveDim(id)
+ }
+
+ for _, variable := range chart.Vars {
+ v, ok := data[variable.ID]
+ if ok {
+ _ = j.apiWriter.varSet(variable.ID, v)
+ }
+ }
+
+ _ = j.apiWriter.end()
+
+ chart.updated = updated > 0
+
+ if chart.updated {
+ chart.Retries = 0
+ } else {
+ chart.Retries++
+ }
+
+ return chart.updated
+}
+
+func (j Job) penalty() int {
+ v := j.retries / penaltyStep * penaltyStep * j.UpdateEvery / 2
+ if v > maxPenalty {
+ return maxPenalty
+ }
+ return v
+}
+
+func calcSinceLastRun(curTime, prevRun time.Time) int {
+ if prevRun.IsZero() {
+ return 0
+ }
+ // monotonic
+ // durationTo(curTime.Sub(prevRun), time.Microsecond)
+ return int((curTime.UnixNano() - prevRun.UnixNano()) / 1000)
+}
+
+func durationTo(duration time.Duration, to time.Duration) int {
+ return int(int64(duration) / (int64(to) / int64(time.Nanosecond)))
+}
+
+func firstNotEmpty(values ...string) string {
+ for _, v := range values {
+ if v != "" {
+ return v
+ }
+ }
+ return ""
+}
diff --git a/module/job_test.go b/module/job_test.go
new file mode 100644
index 0000000..a309b14
--- /dev/null
+++ b/module/job_test.go
@@ -0,0 +1,311 @@
+package module
+
+import (
+ "fmt"
+ "io/ioutil"
+ "testing"
+ "time"
+
+ "github.com/netdata/go-orchestrator/logger"
+
+ "github.com/stretchr/testify/assert"
+)
+
+var (
+ testPluginName = "pluginTest"
+ testModName = "modName"
+ testJobName = "jobName"
+)
+
+func newTestJob() *Job {
+ return NewJob(
+ testPluginName,
+ testModName,
+ nil,
+ ioutil.Discard,
+ )
+}
+
+func TestNewJob(t *testing.T) {
+ assert.IsType(t, (*Job)(nil), newTestJob())
+}
+
+func TestJob_FullName(t *testing.T) {
+ job := newTestJob()
+
+ assert.Equal(t, job.FullName(), testModName)
+ job.Nam = testModName
+ assert.Equal(t, job.FullName(), testModName)
+ job.Nam = testJobName
+ assert.Equal(t, job.FullName(), fmt.Sprintf("%s_%s", testModName, testJobName))
+
+}
+
+func TestJob_ModuleName(t *testing.T) {
+ job := newTestJob()
+
+ assert.Equal(t, job.ModuleName(), testModName)
+}
+
+func TestJob_Name(t *testing.T) {
+ job := newTestJob()
+
+ assert.Equal(t, job.Name(), testModName)
+ job.Nam = testJobName
+ assert.Equal(t, job.Name(), testJobName)
+}
+
+func TestJob_Panicked(t *testing.T) {
+ job := newTestJob()
+
+ assert.Equal(t, job.Panicked(), job.panicked)
+ job.panicked = true
+ assert.Equal(t, job.Panicked(), job.panicked)
+}
+
+func TestJob_AutoDetectionEvery(t *testing.T) {
+ job := newTestJob()
+
+ assert.Equal(t, job.AutoDetectionEvery(), job.AutoDetectEvery)
+ job.AutoDetectEvery = 1
+ assert.Equal(t, job.AutoDetectionEvery(), job.AutoDetectEvery)
+}
+
+func TestJob_RetryAutoDetection(t *testing.T) {
+ job := newTestJob()
+ m := &MockModule{
+ InitFunc: func() bool {
+ return true
+ },
+ CheckFunc: func() bool { return false },
+ ChartsFunc: func() *Charts {
+ return &Charts{}
+ },
+ }
+ job.module = m
+ job.AutoDetectEvery = 1
+
+ assert.True(t, job.RetryAutoDetection())
+ assert.Equal(t, infTries, job.AutoDetectTries)
+ for i := 0; i < 1000; i++ {
+ job.check()
+ }
+ assert.True(t, job.RetryAutoDetection())
+ assert.Equal(t, infTries, job.AutoDetectTries)
+
+ job.AutoDetectTries = 10
+ for i := 0; i < 10; i++ {
+ job.check()
+ }
+ assert.False(t, job.RetryAutoDetection())
+ assert.Equal(t, 0, job.AutoDetectTries)
+}
+
+func TestJob_AutoDetection(t *testing.T) {
+ job := newTestJob()
+ var v int
+ m := &MockModule{
+ InitFunc: func() bool {
+ v++
+ return true
+ },
+ CheckFunc: func() bool {
+ v++
+ return true
+ },
+ ChartsFunc: func() *Charts {
+ v++
+ return &Charts{}
+ },
+ }
+ job.module = m
+
+ assert.True(t, job.AutoDetection())
+ assert.Equal(t, 3, v)
+}
+
+func TestJob_AutoDetection_FailInit(t *testing.T) {
+ job := newTestJob()
+ m := &MockModule{
+ InitFunc: func() bool {
+ return false
+ },
+ }
+ job.module = m
+
+ assert.False(t, job.AutoDetection())
+ assert.True(t, m.CleanupDone)
+}
+
+func TestJob_AutoDetection_FailCheck(t *testing.T) {
+ job := newTestJob()
+ m := &MockModule{
+ InitFunc: func() bool {
+ return true
+ },
+ CheckFunc: func() bool {
+ return false
+ },
+ }
+ job.module = m
+
+ assert.False(t, job.AutoDetection())
+ assert.True(t, m.CleanupDone)
+}
+
+func TestJob_AutoDetection_FailPostCheck(t *testing.T) {
+ job := newTestJob()
+ m := &MockModule{
+ InitFunc: func() bool {
+ return true
+ },
+ CheckFunc: func() bool {
+ return true
+ },
+ ChartsFunc: func() *Charts {
+ return nil
+ },
+ }
+ job.module = m
+
+ assert.False(t, job.AutoDetection())
+ assert.True(t, m.CleanupDone)
+}
+
+func TestJob_AutoDetection_PanicInit(t *testing.T) {
+ job := newTestJob()
+ m := &MockModule{
+ InitFunc: func() bool {
+ panic("panic in Init")
+ },
+ }
+ job.module = m
+
+ assert.False(t, job.AutoDetection())
+ assert.True(t, m.CleanupDone)
+}
+
+func TestJob_AutoDetection_PanicCheck(t *testing.T) {
+ job := newTestJob()
+ m := &MockModule{
+ InitFunc: func() bool {
+ return true
+ },
+ CheckFunc: func() bool {
+ panic("panic in Check")
+ },
+ }
+ job.module = m
+
+ assert.False(t, job.AutoDetection())
+ assert.True(t, m.CleanupDone)
+}
+
+func TestJob_AutoDetection_PanicPostCheck(t *testing.T) {
+ job := newTestJob()
+ m := &MockModule{
+ InitFunc: func() bool {
+ return true
+ },
+ CheckFunc: func() bool {
+ return true
+ },
+ ChartsFunc: func() *Charts {
+ panic("panic in PostCheck")
+ },
+ }
+ job.module = m
+
+ assert.False(t, job.AutoDetection())
+ assert.True(t, m.CleanupDone)
+}
+
+func TestJob_MainLoop(t *testing.T) {
+ m := &MockModule{
+ ChartsFunc: func() *Charts {
+ return &Charts{
+ &Chart{
+ ID: "id",
+ Title: "title",
+ Units: "units",
+ Dims: Dims{
+ {ID: "id1"},
+ {ID: "id2"},
+ },
+ },
+ }
+ },
+ CollectFunc: func() map[string]int64 {
+ return map[string]int64{
+ "id1": 1,
+ "id2": 2,
+ }
+ },
+ }
+ job := newTestJob()
+ job.module = m
+ job.charts = job.module.Charts()
+ job.UpdateEvery = 1
+
+ go func() {
+ for i := 1; i < 3; i++ {
+ job.Tick(i)
+ time.Sleep(time.Second)
+ }
+ job.Stop()
+ }()
+
+ job.MainLoop()
+
+ assert.True(t, m.CleanupDone)
+}
+
+func TestJob_MainLoop_Panic(t *testing.T) {
+ m := &MockModule{
+ CollectFunc: func() map[string]int64 {
+ panic("panic in Collect")
+ },
+ }
+ job := newTestJob()
+ job.module = m
+ job.UpdateEvery = 1
+
+ go func() {
+ for i := 1; i < 3; i++ {
+ time.Sleep(time.Second)
+ job.Tick(i)
+ }
+ job.Stop()
+ }()
+
+ job.MainLoop()
+
+ assert.True(t, job.Panicked())
+ assert.True(t, m.CleanupDone)
+}
+
+func TestJob_Tick(t *testing.T) {
+ job := newTestJob()
+ for i := 0; i < 3; i++ {
+ job.Tick(i)
+ }
+}
+
+func TestJob_Start(t *testing.T) {
+ job := newTestJob()
+ job.module = &MockModule{}
+
+ go func() {
+ time.Sleep(time.Second)
+ job.Stop()
+ }()
+
+ job.Start()
+}
+
+func TestBase_SetLogger(t *testing.T) {
+ var b Base
+ b.SetLogger(&logger.Logger{})
+
+ assert.NotNil(t, b.Logger)
+}
diff --git a/module/mock.go b/module/mock.go
new file mode 100644
index 0000000..d5c157d
--- /dev/null
+++ b/module/mock.go
@@ -0,0 +1,53 @@
+package module
+
+// MockModule MockModule.
+type MockModule struct {
+ Base
+
+ InitFunc func() bool
+ CheckFunc func() bool
+ ChartsFunc func() *Charts
+ CollectFunc func() map[string]int64
+ CleanupFunc func()
+ CleanupDone bool
+}
+
+// Init invokes InitFunc.
+func (m MockModule) Init() bool {
+ if m.InitFunc == nil {
+ return true
+ }
+ return m.InitFunc()
+}
+
+// Check invokes CheckFunc.
+func (m MockModule) Check() bool {
+ if m.CheckFunc == nil {
+ return true
+ }
+ return m.CheckFunc()
+}
+
+// Charts invokes ChartsFunc.
+func (m MockModule) Charts() *Charts {
+ if m.ChartsFunc == nil {
+ return nil
+ }
+ return m.ChartsFunc()
+}
+
+// Collect invokes CollectDunc.
+func (m MockModule) Collect() map[string]int64 {
+ if m.CollectFunc == nil {
+ return nil
+ }
+ return m.CollectFunc()
+}
+
+// Cleanup sets CleanupDone to true.
+func (m *MockModule) Cleanup() {
+ if m.CleanupFunc != nil {
+ m.CleanupFunc()
+ }
+ m.CleanupDone = true
+}
diff --git a/module/mock_test.go b/module/mock_test.go
new file mode 100644
index 0000000..f06656d
--- /dev/null
+++ b/module/mock_test.go
@@ -0,0 +1,52 @@
+package module
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestMockModule_Init(t *testing.T) {
+ m := &MockModule{}
+
+ assert.True(t, m.Init())
+ m.InitFunc = func() bool { return false }
+ assert.False(t, m.Init())
+}
+
+func TestMockModule_Check(t *testing.T) {
+ m := &MockModule{}
+
+ assert.True(t, m.Check())
+ m.CheckFunc = func() bool { return false }
+ assert.False(t, m.Check())
+}
+
+func TestMockModule_Charts(t *testing.T) {
+ m := &MockModule{}
+ c := &Charts{}
+
+ assert.Nil(t, m.Charts())
+ m.ChartsFunc = func() *Charts { return c }
+ assert.True(t, c == m.Charts())
+}
+
+func TestMockModule_Collect(t *testing.T) {
+ m := &MockModule{}
+ d := map[string]int64{
+ "1": 1,
+ }
+
+ assert.Nil(t, m.Collect())
+ m.CollectFunc = func() map[string]int64 { return d }
+ assert.Equal(t, d, m.Collect())
+}
+
+func TestMockModule_Cleanup(t *testing.T) {
+ m := &MockModule{}
+ require.False(t, m.CleanupDone)
+
+ m.Cleanup()
+ assert.True(t, m.CleanupDone)
+}
diff --git a/module/module.go b/module/module.go
new file mode 100644
index 0000000..5e5656d
--- /dev/null
+++ b/module/module.go
@@ -0,0 +1,37 @@
+package module
+
+import (
+ "github.com/netdata/go-orchestrator/logger"
+)
+
+// Module is an interface that represents a module.
+type Module interface {
+ // Init does initialization.
+ // If it return false, the job will be disabled.
+ Init() bool
+
+ // Check is called after Init.
+ // If it return false, the job will be disabled.
+ Check() bool
+
+ // Charts returns the chart definition.
+ // Make sure not to share returned instance.
+ Charts() *Charts
+
+ // Collect collects metrics.
+ Collect() map[string]int64
+
+ // SetLogger SetLogger
+ SetLogger(l *logger.Logger)
+
+ // Cleanup Cleanup
+ Cleanup()
+}
+
+// Base is a helper struct. All modules should embed this struct.
+type Base struct {
+ *logger.Logger
+}
+
+// SetLogger SetLogger
+func (b *Base) SetLogger(l *logger.Logger) { b.Logger = l }
diff --git a/module/netdataapi.go b/module/netdataapi.go
new file mode 100644
index 0000000..f1ea8e8
--- /dev/null
+++ b/module/netdataapi.go
@@ -0,0 +1,89 @@
+package module
+
+import (
+ "fmt"
+ "io"
+)
+
+type (
+ // apiWriter write native netdata orchestrator API
+ // https://github.com/firehol/netdata/wiki/External-Plugins#native-netdata-plugin-api
+ apiWriter struct {
+ // Out write to
+ io.Writer
+ }
+)
+
+// chart defines a new chart.
+func (w *apiWriter) chart(
+ typeID string,
+ ID string,
+ name string,
+ title string,
+ units string,
+ family string,
+ context string,
+ chartType chartType,
+ priority int,
+ updateEvery int,
+ options Opts,
+ plugin string,
+ module string) error {
+ _, err := fmt.Fprintf(w, "CHART '%s.%s' '%s' '%s' '%s' '%s' '%s' '%s' '%d' '%d' '%s' '%s' '%s'\n",
+ typeID, ID, name, title, units, family, context, chartType, priority, updateEvery, options, plugin, module)
+ return err
+}
+
+//dimension defines a new dimension for the chart.
+func (w *apiWriter) dimension(
+ ID string,
+ name string,
+ algorithm dimAlgo,
+ multiplier dimDivMul,
+ divisor dimDivMul,
+ options DimOpts) error {
+ _, err := fmt.Fprintf(w, "DIMENSION '%s' '%s' '%s' '%s' '%s' '%s'\n",
+ ID, name, algorithm, multiplier, divisor, options)
+ return err
+}
+
+// begin initialize data collection for a chart.
+func (w *apiWriter) begin(typeID string, ID string, msSince int) error {
+ var err error
+ if msSince > 0 {
+ _, err = fmt.Fprintf(w, "BEGIN %s.%s %d\n", typeID, ID, msSince)
+ } else {
+ _, err = fmt.Fprintf(w, "BEGIN %s.%s\n", typeID, ID)
+ }
+ return err
+}
+
+// dimSet sets the value of a dimension for the initialized chart.
+func (w *apiWriter) dimSet(ID string, value int64) error {
+ _, err := fmt.Fprintf(w, "SET %s = %d\n", ID, value)
+ return err
+}
+
+// dimSetEmpty sets the empty value of a dimension for the initialized chart.
+func (w *apiWriter) dimSetEmpty(ID string) error {
+ _, err := fmt.Fprintf(w, "SET %s = \n", ID)
+ return err
+}
+
+// varSet sets the value of a variable for the initialized chart.
+func (w *apiWriter) varSet(ID string, value int64) error {
+ _, err := fmt.Fprintf(w, "VARIABLE CHART %s = %d\n", ID, value)
+ return err
+}
+
+// end complete data collection for the initialized chart.
+func (w *apiWriter) end() error {
+ _, err := fmt.Fprintf(w, "END\n\n")
+ return err
+}
+
+// flush ignore the last collected values.
+func (w *apiWriter) flush() error {
+ _, err := fmt.Fprintf(w, "FLUSH\n")
+ return err
+}
diff --git a/module/netdataapi_test.go b/module/netdataapi_test.go
new file mode 100644
index 0000000..b11e75e
--- /dev/null
+++ b/module/netdataapi_test.go
@@ -0,0 +1,151 @@
+package module
+
+import (
+ "bytes"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNetdataAPI_chart(t *testing.T) {
+ b := &bytes.Buffer{}
+ netdataAPI := apiWriter{Writer: b}
+
+ _ = netdataAPI.chart(
+ "",
+ "id",
+ "name",
+ "title",
+ "units",
+ "family",
+ "context",
+ Line,
+ 1,
+ 1,
+ Opts{},
+ "orchestrator",
+ "module",
+ )
+
+ assert.Equal(
+ t,
+ "CHART '.id' 'name' 'title' 'units' 'family' 'context' 'line' '1' '1' '' 'orchestrator' 'module'\n",
+ b.String(),
+ )
+}
+
+func TestNetdataAPI_dimension(t *testing.T) {
+ b := &bytes.Buffer{}
+ netdataAPI := apiWriter{Writer: b}
+
+ _ = netdataAPI.dimension(
+ "id",
+ "name",
+ Absolute,
+ 1,
+ 1,
+ DimOpts{},
+ )
+
+ assert.Equal(
+ t,
+ "DIMENSION 'id' 'name' 'absolute' '1' '1' ''\n",
+ b.String(),
+ )
+}
+
+func TestNetdataAPI_begin(t *testing.T) {
+ b := &bytes.Buffer{}
+ netdataAPI := apiWriter{Writer: b}
+
+ _ = netdataAPI.begin(
+ "typeID",
+ "id",
+ 0,
+ )
+
+ assert.Equal(
+ t,
+ "BEGIN typeID.id\n",
+ b.String(),
+ )
+
+ b.Reset()
+
+ _ = netdataAPI.begin(
+ "typeID",
+ "id",
+ 1,
+ )
+
+ assert.Equal(
+ t,
+ "BEGIN typeID.id 1\n",
+ b.String(),
+ )
+}
+
+func TestNetdataAPI_dimSet(t *testing.T) {
+ b := &bytes.Buffer{}
+ netdataAPI := apiWriter{Writer: b}
+
+ _ = netdataAPI.dimSet("id", 100)
+
+ assert.Equal(
+ t,
+ "SET id = 100\n",
+ b.String(),
+ )
+}
+
+func TestNetdataAPI_dimSetEmpty(t *testing.T) {
+ b := &bytes.Buffer{}
+ netdataAPI := apiWriter{Writer: b}
+
+ _ = netdataAPI.dimSetEmpty("id")
+
+ assert.Equal(
+ t,
+ "SET id = \n",
+ b.String(),
+ )
+}
+
+func TestNetdataAPI_varSet(t *testing.T) {
+ b := &bytes.Buffer{}
+ netdataAPI := apiWriter{Writer: b}
+
+ _ = netdataAPI.varSet("id", 100)
+
+ assert.Equal(
+ t,
+ "VARIABLE CHART id = 100\n",
+ b.String(),
+ )
+}
+
+func TestNetdataAPI_end(t *testing.T) {
+ b := &bytes.Buffer{}
+ netdataAPI := apiWriter{Writer: b}
+
+ _ = netdataAPI.end()
+
+ assert.Equal(
+ t,
+ "END\n\n",
+ b.String(),
+ )
+}
+
+func TestNetdataAPI_flush(t *testing.T) {
+ b := &bytes.Buffer{}
+ netdataAPI := apiWriter{Writer: b}
+
+ _ = netdataAPI.flush()
+
+ assert.Equal(
+ t,
+ "FLUSH\n",
+ b.String(),
+ )
+}
diff --git a/module/registry.go b/module/registry.go
new file mode 100644
index 0000000..7ea0dfa
--- /dev/null
+++ b/module/registry.go
@@ -0,0 +1,37 @@
+package module
+
+import "fmt"
+
+// Defaults is a set of module default parameters.
+type Defaults struct {
+ UpdateEvery int
+ AutoDetectionRetry int
+ Priority int
+ Disabled bool
+}
+
+type (
+ // Creator is a Job builder.
+ Creator struct {
+ Defaults
+ Create func() Module
+ }
+ // Registry is a collection of Creators.
+ Registry map[string]Creator
+)
+
+// DefaultRegistry DefaultRegistry.
+var DefaultRegistry = Registry{}
+
+// Register registers a module in the DefaultRegistry.
+func Register(name string, creator Creator) {
+ DefaultRegistry.Register(name, creator)
+}
+
+// Register registers a module.
+func (r Registry) Register(name string, creator Creator) {
+ if _, ok := r[name]; ok {
+ panic(fmt.Sprintf("%s is already in registry", name))
+ }
+ r[name] = creator
+}
diff --git a/module/registry_test.go b/module/registry_test.go
new file mode 100644
index 0000000..1d8633d
--- /dev/null
+++ b/module/registry_test.go
@@ -0,0 +1,32 @@
+package module
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestRegister(t *testing.T) {
+ modName := "modName"
+ registry := make(Registry)
+
+ // OK case
+ assert.NotPanics(
+ t,
+ func() {
+ registry.Register(modName, Creator{})
+ })
+
+ _, exist := registry[modName]
+
+ require.True(t, exist)
+
+ // Panic case
+ assert.Panics(
+ t,
+ func() {
+ registry.Register(modName, Creator{})
+ })
+
+}
diff --git a/orchestrator.go b/orchestrator.go
new file mode 100644
index 0000000..8764f33
--- /dev/null
+++ b/orchestrator.go
@@ -0,0 +1,224 @@
+package orchestrator
+
+import (
+ "fmt"
+ "io"
+ "os"
+ "os/signal"
+ "path"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/netdata/go-orchestrator/cli"
+ "github.com/netdata/go-orchestrator/logger"
+ "github.com/netdata/go-orchestrator/module"
+ "github.com/netdata/go-orchestrator/pkg/multipath"
+
+ "github.com/mattn/go-isatty"
+ "gopkg.in/yaml.v2"
+)
+
+var (
+ log = logger.New("plugin", "main", "main")
+ jobStatusesFile = "god-jobs-statuses.json"
+)
+
+// Job is an interface that represents a job.
+type Job interface {
+ FullName() string
+ ModuleName() string
+ Name() string
+ AutoDetection() bool
+ AutoDetectionEvery() int
+ RetryAutoDetection() bool
+ Panicked() bool
+ Tick(clock int)
+ Start()
+ Stop()
+}
+
+type Config struct {
+ Enabled bool `yaml:"enabled"`
+ DefaultRun bool `yaml:"default_run"`
+ MaxProcs int `yaml:"max_procs"`
+ Modules map[string]bool `yaml:"modules"`
+}
+
+func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
+ type config Config
+ cc := config(*c)
+ if err := unmarshal(&cc); err != nil {
+ return err
+ }
+ *c = Config(cc)
+
+ var m map[string]interface{}
+ if err := unmarshal(&m); err != nil {
+ return err
+ }
+
+ for key, value := range m {
+ switch key {
+ case "enabled", "default_run", "max_procs", "modules":
+ continue
+ }
+ var b bool
+ if in, err := yaml.Marshal(value); err != nil || yaml.Unmarshal(in, &b) != nil {
+ continue
+ }
+ if c.Modules == nil {
+ c.Modules = make(map[string]bool)
+ }
+ c.Modules[key] = b
+ }
+
+ return nil
+}
+
+func (c Config) isModuleEnabled(module string, explicit bool) bool {
+ if run, ok := c.Modules[module]; ok {
+ return run
+ }
+ if explicit {
+ return false
+ }
+ return c.DefaultRun
+}
+
+// New creates Orchestrator.
+func New() *Orchestrator {
+ return &Orchestrator{
+ ConfigPath: multipath.New(
+ os.Getenv("NETDATA_USER_CONFIG_DIR"),
+ os.Getenv("NETDATA_STOCK_CONFIG_DIR"),
+ ),
+ Config: &Config{Enabled: true, DefaultRun: true},
+ Registry: module.DefaultRegistry,
+ Out: os.Stdout,
+ varLibDir: os.Getenv("NETDATA_LIB_DIR"),
+ modules: make(module.Registry),
+ jobStartCh: make(chan Job),
+ jobStartStop: make(chan struct{}),
+ mainLoopStop: make(chan struct{}),
+ jobsStatuses: &jobsStatuses{mux: new(sync.Mutex), items: make(map[string]map[string]string)},
+ }
+}
+
+// Orchestrator represents orchestrator.
+type Orchestrator struct {
+ Name string
+ Out io.Writer
+ Registry module.Registry
+ Option *cli.Option
+ ConfigPath multipath.MultiPath
+ Config *Config
+ ModulesConfigDirName string
+
+ varLibDir string
+ configName string
+
+ jobStartCh chan Job
+ jobStartStop chan struct{}
+ mainLoopStop chan struct{}
+
+ jobStatusWriter io.Writer
+ jobsStatuses *jobsStatuses
+ modules module.Registry
+ loopQueue loopQueue
+}
+
+// Serve Serve
+func (o *Orchestrator) Serve() {
+ go signalHandling()
+
+ isAtty := isatty.IsTerminal(os.Stdout.Fd())
+
+ if !isAtty {
+ go keepAlive()
+ }
+
+ go o.jobStartLoop()
+
+ for _, job := range o.createJobs() {
+ o.jobStartCh <- job
+ }
+
+ if !isAtty && o.varLibDir != "" {
+ w := &fileWriter{path: path.Join(o.varLibDir, jobStatusesFile)}
+ s := newJobsStatusesSaver(w, o.jobsStatuses, time.Second*10)
+ go s.mainLoop()
+ defer s.stop()
+ }
+
+ o.mainLoop()
+}
+
+func (o *Orchestrator) mainLoop() {
+ log.Info("start main loop")
+ tk := NewTicker(time.Second)
+
+LOOP:
+ for {
+ select {
+ case <-o.mainLoopStop:
+ break LOOP
+ case clock := <-tk.C:
+ o.runOnce(clock)
+ }
+ }
+}
+
+func (o *Orchestrator) runOnce(clock int) {
+ log.Debugf("tick %d", clock)
+ o.loopQueue.notify(clock)
+}
+
+func (o *Orchestrator) stop() {
+ o.jobStartStop <- struct{}{}
+ o.mainLoopStop <- struct{}{}
+}
+
+func signalHandling() {
+ signalChan := make(chan os.Signal, 1)
+ signal.Notify(signalChan, syscall.SIGINT, syscall.SIGHUP, syscall.SIGPIPE)
+
+ switch <-signalChan {
+ case syscall.SIGINT:
+ log.Info("SIGINT received. Terminating...")
+ case syscall.SIGHUP:
+ log.Info("SIGHUP received. Terminating...")
+ case syscall.SIGPIPE:
+ log.Critical("SIGPIPE received. Terminating...")
+ os.Exit(1)
+ }
+ os.Exit(0)
+}
+
+func keepAlive() {
+ t := time.Tick(time.Second)
+ for range t {
+ _, _ = fmt.Fprint(os.Stdout, "\n")
+ }
+}
+
+type loopQueue struct {
+ mux sync.Mutex
+ queue []Job
+}
+
+func (q *loopQueue) add(job Job) {
+ q.mux.Lock()
+ defer q.mux.Unlock()
+
+ q.queue = append(q.queue, job)
+}
+
+func (q *loopQueue) notify(clock int) {
+ q.mux.Lock()
+ defer q.mux.Unlock()
+
+ for _, job := range q.queue {
+ job.Tick(clock)
+ }
+}
diff --git a/orchestrator_test.go b/orchestrator_test.go
new file mode 100644
index 0000000..48b7395
--- /dev/null
+++ b/orchestrator_test.go
@@ -0,0 +1,184 @@
+package orchestrator
+
+import (
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/netdata/go-orchestrator/cli"
+ "github.com/netdata/go-orchestrator/module"
+ "github.com/netdata/go-orchestrator/pkg/multipath"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestNew(t *testing.T) {
+ o := New()
+ assert.IsType(t, (*Orchestrator)(nil), o)
+ assert.NotNil(t, o.Out)
+ assert.NotNil(t, o.ConfigPath)
+ assert.NotNil(t, o.Registry)
+ assert.NotNil(t, o.Config)
+}
+
+func TestOrchestrator_lifecycle(t *testing.T) {
+ o := New()
+ o.Name = "test.d"
+
+ var mtx sync.Mutex
+ counter := map[string]int{}
+
+ mod := func(name string) module.Module {
+ return &module.MockModule{
+ InitFunc: func() bool {
+ mtx.Lock()
+ defer mtx.Unlock()
+ counter[name+"_init"]++
+ log.Infof("[%s] init", name)
+ return true
+ },
+ CheckFunc: func() bool {
+ mtx.Lock()
+ defer mtx.Unlock()
+ counter[name+"_check"]++
+ log.Infof("[%s] check", name)
+ return name != "fail"
+ },
+ ChartsFunc: func() *module.Charts {
+ mtx.Lock()
+ defer mtx.Unlock()
+ counter[name+"_charts"]++
+ log.Infof("[%s] charts", name)
+ return &module.Charts{
+ &module.Chart{ID: "id", Title: "title", Units: "units", Dims: module.Dims{{ID: "id1"}}},
+ }
+ },
+ CollectFunc: func() map[string]int64 {
+ mtx.Lock()
+ defer mtx.Unlock()
+ counter[name+"_collect"]++
+ log.Infof("[%s] collect", name)
+ return map[string]int64{"id1": 1}
+ },
+ CleanupFunc: func() {
+ mtx.Lock()
+ defer mtx.Unlock()
+ counter[name+"_cleanup"]++
+ log.Infof("[%s] cleanup", name)
+ },
+ }
+ }
+
+ o.Option = &cli.Option{Module: "all"}
+ o.ConfigPath = multipath.New("./testdata")
+ o.Registry = module.Registry{
+ "normal": module.Creator{Create: func() module.Module { return mod("normal") }},
+ "fail": module.Creator{Create: func() module.Module { return mod("fail") }},
+ }
+ o.configName = "test.d.conf.yml"
+
+ require.True(t, o.Setup())
+
+ go o.Serve()
+
+ time.Sleep(time.Second * 2)
+
+ o.stop()
+
+ for _, job := range o.loopQueue.queue {
+ job.Stop()
+ }
+ time.Sleep(time.Second)
+
+ func() {
+ mtx.Lock()
+ defer mtx.Unlock()
+ assert.Equal(t, 1, counter["normal_init"])
+ assert.Equal(t, 1, counter["fail_init"])
+ assert.Equal(t, 1, counter["normal_check"])
+ assert.Equal(t, 1, counter["fail_check"])
+ assert.Equal(t, 1, counter["normal_charts"])
+ assert.Equal(t, 0, counter["fail_charts"])
+ assert.Equal(t, 2, counter["normal_collect"])
+ assert.Equal(t, 0, counter["fail_collect"])
+ assert.Equal(t, 1, counter["normal_cleanup"])
+ assert.Equal(t, 1, counter["fail_cleanup"])
+ }()
+}
+
+func TestOrchestrator_Serve(t *testing.T) {
+ o := New()
+ o.Name = "test.d"
+
+ mod := func() module.Module {
+ return &module.MockModule{
+ InitFunc: func() bool { return true },
+ CheckFunc: func() bool { return true },
+ ChartsFunc: func() *module.Charts {
+ return &module.Charts{
+ &module.Chart{
+ ID: "id",
+ Title: "title",
+ Units: "units",
+ Dims: module.Dims{
+ {ID: "id1"},
+ {ID: "id2"},
+ },
+ },
+ }
+ },
+ CollectFunc: func() map[string]int64 {
+ return map[string]int64{
+ "id1": 1,
+ "id2": 2,
+ }
+ },
+ }
+ }
+
+ o.Option = &cli.Option{Module: "all"}
+ o.ConfigPath = multipath.New("./testdata")
+ o.Registry = module.Registry{
+ "module1": module.Creator{Create: func() module.Module { return mod() }},
+ "module2": module.Creator{Create: func() module.Module { return mod() }},
+ }
+ o.configName = "test.d.conf.yml"
+
+ require.True(t, o.Setup())
+
+ go o.Serve()
+
+ time.Sleep(time.Second * 3)
+
+ o.stop()
+
+ for _, job := range o.loopQueue.queue {
+ job.Stop()
+ }
+}
+
+func TestLoopQueue_add(t *testing.T) {
+ var l loopQueue
+ var wg sync.WaitGroup
+
+ workers := 10
+ addNum := 1000
+
+ f := func() {
+ for i := 0; i < addNum; i++ {
+ l.add(nil)
+ }
+ wg.Done()
+ }
+
+ wg.Add(workers)
+
+ for i := 0; i < workers; i++ {
+ go f()
+ }
+
+ wg.Wait()
+
+ assert.Equal(t, workers*addNum, len(l.queue))
+}
diff --git a/pkg/multipath/multipath.go b/pkg/multipath/multipath.go
new file mode 100644
index 0000000..247a847
--- /dev/null
+++ b/pkg/multipath/multipath.go
@@ -0,0 +1,57 @@
+package multipath
+
+import (
+ "fmt"
+ "os"
+ "path"
+
+ "github.com/mitchellh/go-homedir"
+)
+
+type ErrNotFound struct{ msg string }
+
+func (e ErrNotFound) Error() string { return e.msg }
+
+// IsNotFound returns a boolean indicating whether the error is ErrNotFound or not.
+func IsNotFound(err error) bool {
+ switch err.(type) {
+ case ErrNotFound:
+ return true
+ }
+ return false
+}
+
+// MultiPath multi-paths
+type MultiPath []string
+
+// New New multi-paths
+func New(paths ...string) MultiPath {
+ set := map[string]bool{}
+ mPath := make(MultiPath, 0)
+
+ for _, dir := range paths {
+ if dir == "" {
+ continue
+ }
+ if d, err := homedir.Expand(dir); err != nil {
+ dir = d
+ }
+ if !set[dir] {
+ mPath = append(mPath, dir)
+ set[dir] = true
+ }
+ }
+
+ return mPath
+}
+
+// Find find a file in given paths
+func (p MultiPath) Find(filename string) (string, error) {
+ for _, dir := range p {
+ file := path.Join(dir, filename)
+ if _, err := os.Stat(file); !os.IsNotExist(err) {
+ return file, nil
+ }
+ }
+ return "", ErrNotFound{msg: fmt.Sprintf("can't find '%s' in %v", filename, p)}
+}
diff --git a/pkg/multipath/multipath_test.go b/pkg/multipath/multipath_test.go
new file mode 100644
index 0000000..a19a706
--- /dev/null
+++ b/pkg/multipath/multipath_test.go
@@ -0,0 +1,37 @@
+package multipath
+
+import (
+ "errors"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNew(t *testing.T) {
+ assert.Len(
+ t,
+ New("path1", "path2", "path2", "", "path3"),
+ 3,
+ )
+}
+
+func TestMultiPath_Find(t *testing.T) {
+ m := New("path1", "tests")
+
+ v, err := m.Find("not exist")
+ assert.Zero(t, v)
+ assert.Error(t, err)
+
+ v, err = m.Find("test-empty.conf")
+ assert.Equal(t, "tests/test-empty.conf", v)
+ assert.Nil(t, err)
+
+ v, err = m.Find("test.conf")
+ assert.Equal(t, "tests/test.conf", v)
+ assert.Nil(t, err)
+}
+
+func TestIsNotFound(t *testing.T) {
+ assert.True(t, IsNotFound(ErrNotFound{}))
+ assert.False(t, IsNotFound(errors.New("")))
+}
diff --git a/pkg/multipath/tests/test-empty.conf b/pkg/multipath/tests/test-empty.conf
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/pkg/multipath/tests/test-empty.conf
diff --git a/pkg/multipath/tests/test.conf b/pkg/multipath/tests/test.conf
new file mode 100644
index 0000000..aebe647
--- /dev/null
+++ b/pkg/multipath/tests/test.conf
@@ -0,0 +1 @@
+not empty! \ No newline at end of file
diff --git a/save.go b/save.go
new file mode 100644
index 0000000..ac4b2ba
--- /dev/null
+++ b/save.go
@@ -0,0 +1,132 @@
+package orchestrator
+
+import (
+ "encoding/json"
+ "io"
+ "os"
+ "sync"
+ "time"
+)
+
+func newJobsStatusesSaver(w io.Writer, js *jobsStatuses, saveEvery time.Duration) *jobsStatusesSaver {
+ return &jobsStatusesSaver{
+ Writer: w,
+ js: js,
+ freq: saveEvery,
+ once: sync.Once{},
+ stopCh: make(chan struct{}),
+ }
+}
+
+type jobsStatusesSaver struct {
+ io.Writer
+ js *jobsStatuses
+ freq time.Duration
+ once sync.Once
+ stopCh chan struct{}
+}
+
+func (s *jobsStatusesSaver) mainLoop() {
+ t := time.NewTicker(s.freq)
+ defer t.Stop()
+LOOP:
+ for {
+ select {
+ case <-s.stopCh:
+ break LOOP
+ case <-t.C:
+ s.runOnce()
+ }
+ }
+}
+
+func (s *jobsStatusesSaver) stop() {
+ s.once.Do(func() { close(s.stopCh) })
+}
+
+func (s *jobsStatusesSaver) runOnce() {
+ b, err := s.js.asBytes()
+ if err != nil {
+ log.Errorf("error on converting jobs statuses : %v", err)
+ return
+ }
+ _, err = s.Write(b)
+ if err != nil {
+ log.Errorf("error on writing jobs statuses : %v", err)
+ }
+}
+
+type fileWriter struct {
+ path string
+}
+
+func (s fileWriter) Write(data []byte) (n int, err error) {
+ f, err := os.Create(s.path)
+ if err != nil {
+ return
+ }
+ defer f.Close()
+
+ n, err = f.Write(data)
+ return
+}
+
+func newJobsStatuses() *jobsStatuses {
+ return &jobsStatuses{mux: new(sync.Mutex), items: make(map[string]map[string]string)}
+}
+
+type jobsStatuses struct {
+ mux *sync.Mutex
+ items map[string]map[string]string
+}
+
+func (js jobsStatuses) contains(job Job) bool {
+ js.mux.Lock()
+ defer js.mux.Unlock()
+
+ v, ok := js.items[job.ModuleName()]
+ if !ok {
+ return false
+ }
+ _, ok = v[job.Name()]
+ return ok
+}
+
+func (js *jobsStatuses) put(job Job, status string) {
+ js.mux.Lock()
+ defer js.mux.Unlock()
+
+ _, ok := js.items[job.ModuleName()]
+ if !ok {
+ js.items[job.ModuleName()] = make(map[string]string)
+ }
+ js.items[job.ModuleName()][job.Name()] = status
+}
+
+func (js *jobsStatuses) remove(job Job) {
+ js.mux.Lock()
+ defer js.mux.Unlock()
+
+ delete(js.items[job.ModuleName()], job.Name())
+}
+
+func (js *jobsStatuses) asBytes() ([]byte, error) {
+ js.mux.Lock()
+ defer js.mux.Unlock()
+
+ return json.MarshalIndent(js.items, "", " ")
+}
+
+func loadJobsStatusesFromFile(absPath string) (*jobsStatuses, error) {
+ f, err := os.Open(absPath)
+ if err != nil {
+ return nil, err
+ }
+ defer f.Close()
+
+ s := &jobsStatuses{mux: new(sync.Mutex)}
+ if err = json.NewDecoder(f).Decode(&s.items); err != nil {
+ return nil, err
+ }
+ return s, nil
+}
diff --git a/save_test.go b/save_test.go
new file mode 100644
index 0000000..122b59a
--- /dev/null
+++ b/save_test.go
@@ -0,0 +1,25 @@
+package orchestrator
+
+import (
+ "bytes"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_jobStatusesSaveLoop(t *testing.T) {
+ b := &bytes.Buffer{}
+ js := newJobsStatuses()
+ s := newJobsStatusesSaver(b, js, 1)
+
+ js.put(&mockJob{}, "active")
+ s.runOnce()
+ go func() {
+ time.Sleep(time.Second * 3)
+ s.stop()
+ }()
+ s.mainLoop()
+
+ assert.NotZero(t, b.Bytes())
+}
diff --git a/setup.go b/setup.go
new file mode 100644
index 0000000..2ad2903
--- /dev/null
+++ b/setup.go
@@ -0,0 +1,97 @@
+package orchestrator
+
+import (
+ "fmt"
+ "os"
+ "runtime"
+
+ "github.com/netdata/go-orchestrator/logger"
+ "github.com/netdata/go-orchestrator/pkg/multipath"
+
+ "github.com/mattn/go-isatty"
+)
+
+func (o *Orchestrator) Setup() bool {
+ if o.Name == "" {
+ log.Critical("name not set")
+ return false
+ }
+
+ //TODO: fix
+ if !isatty.IsTerminal(os.Stdout.Fd()) {
+ logger.SetPluginName(o.Name, log)
+ }
+
+ if o.Option == nil {
+ log.Critical("cli options not set")
+ return false
+ }
+ if len(o.Option.ConfigDir) != 0 {
+ o.ConfigPath = multipath.New(o.Option.ConfigDir...)
+ }
+ if len(o.ConfigPath) == 0 {
+ log.Critical("config path not set or empty")
+ return false
+ }
+ if len(o.Registry) == 0 {
+ log.Critical("registry not set or empty")
+ return false
+ }
+
+ if o.configName == "" {
+ o.configName = o.Name + ".conf"
+ }
+
+ configFile, err := o.ConfigPath.Find(o.configName)
+
+ if err != nil {
+ if !multipath.IsNotFound(err) {
+ log.Criticalf("find config file error : %v", err)
+ return false
+ }
+ log.Warningf("find config file error : %v, will use default configuration", err)
+ } else {
+ if err := loadYAML(o.Config, configFile); err != nil {
+ log.Criticalf("loadYAML config error : %v", err)
+ return false
+ }
+ }
+
+ if !o.Config.Enabled {
+ log.Info("disabled in configuration file")
+ _, _ = fmt.Fprintln(o.Out, "DISABLE")
+ return false
+ }
+
+ isAll := o.Option.Module == "all"
+ for name, creator := range o.Registry {
+ if !isAll && o.Option.Module != name {
+ continue
+ }
+ if isAll && creator.Disabled && !o.Config.isModuleEnabled(name, true) {
+ log.Infof("module '%s' disabled by default", name)
+ continue
+ }
+ if isAll && !o.Config.isModuleEnabled(name, false) {
+ log.Infof("module '%s' disabled in configuration file", name)
+ continue
+ }
+ o.modules[name] = creator
+ }
+
+ if len(o.modules) == 0 {
+ log.Critical("no modules to run")
+ return false
+ }
+
+ if o.Config.MaxProcs > 0 {
+ log.Infof("maximum number of used CPUs set to %d", o.Config.MaxProcs)
+ runtime.GOMAXPROCS(o.Config.MaxProcs)
+ } else {
+ log.Infof("maximum number of used CPUs %d", runtime.NumCPU())
+ }
+
+ log.Infof("minimum update every %d", o.Option.UpdateEvery)
+
+ return true
+}
diff --git a/setup_test.go b/setup_test.go
new file mode 100644
index 0000000..3ea6d16
--- /dev/null
+++ b/setup_test.go
@@ -0,0 +1,160 @@
+package orchestrator
+
+import (
+ "runtime"
+ "testing"
+
+ "github.com/netdata/go-orchestrator/cli"
+ "github.com/netdata/go-orchestrator/module"
+ "github.com/netdata/go-orchestrator/pkg/multipath"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestOrchestrator_SetupNoName(t *testing.T) {
+ o := New()
+ assert.False(t, o.Setup())
+}
+
+func TestOrchestrator_SetupNoOptions(t *testing.T) {
+ o := New()
+ o.Name = "test"
+ assert.False(t, o.Setup())
+}
+
+func TestOrchestrator_SetupNoConfigPath(t *testing.T) {
+ o := New()
+ o.Name = "test"
+ o.Option = &cli.Option{}
+ o.ConfigPath = nil
+ assert.False(t, o.Setup())
+
+ o.ConfigPath = multipath.New()
+ assert.False(t, o.Setup())
+}
+
+func TestOrchestrator_SetupNoRegistry(t *testing.T) {
+ o := New()
+ o.Name = "test"
+ o.Option = &cli.Option{}
+ o.ConfigPath = multipath.New("./testdata")
+ assert.False(t, o.Setup())
+
+ o.Registry = make(module.Registry)
+ assert.False(t, o.Setup())
+}
+
+func TestOrchestrator_SetupNoConfig(t *testing.T) {
+ o := New()
+ o.Name = "test"
+ o.Option = &cli.Option{Module: "all"}
+ o.ConfigPath = multipath.New("./testdata")
+ o.Registry = module.Registry{"module1": module.Creator{}}
+ assert.True(t, o.Setup())
+}
+
+func TestPlugin_SetupBrokenConfig(t *testing.T) {
+ p := New()
+ p.Name = "test"
+ p.Option = &cli.Option{}
+ p.ConfigPath = multipath.New("./testdata")
+ p.Registry = module.Registry{"module1": module.Creator{}}
+ p.configName = "test.d.conf-broken.yml"
+ assert.False(t, p.Setup())
+}
+
+func TestOrchestrator_SetupEmptyConfig(t *testing.T) {
+ o := New()
+ o.Name = "test"
+ o.Option = &cli.Option{Module: "all"}
+ o.ConfigPath = multipath.New("./testdata")
+ o.Registry = module.Registry{"module1": module.Creator{}}
+ o.configName = "test.d.conf-empty.yml"
+
+ assert.True(t, o.Setup())
+}
+func TestOrchestrator_SetupInvalidModulesSectionConfig(t *testing.T) {
+ o := New()
+ o.Name = "test"
+ o.Option = &cli.Option{Module: "all"}
+ o.ConfigPath = multipath.New("./testdata")
+ o.Registry = module.Registry{"module1": module.Creator{}, "module2": module.Creator{}}
+ o.configName = "test.d.conf-invalid-modules.yml"
+
+ assert.True(t, o.Setup())
+ assert.Len(t, o.Config.Modules, 2)
+}
+
+func TestOrchestrator_SetupDisabledInConfig(t *testing.T) {
+ o := New()
+ o.Name = "test"
+ o.Option = &cli.Option{}
+ o.ConfigPath = multipath.New("./testdata")
+ o.Registry = module.Registry{"module1": module.Creator{}}
+ o.configName = "test.d.conf-disabled.yml"
+ assert.False(t, o.Setup())
+}
+
+func TestOrchestrator_SetupNoModulesToRun(t *testing.T) {
+ o := New()
+ o.Name = "test"
+ o.Option = &cli.Option{Module: "module3"}
+ o.ConfigPath = multipath.New("./testdata")
+ o.Registry = module.Registry{"module1": module.Creator{}}
+ o.configName = "test.d.conf.yml"
+
+ assert.Len(t, o.modules, 0)
+ assert.False(t, o.Setup())
+}
+
+func TestOrchestrator_SetupSetGOMAXPROCS(t *testing.T) {
+ o := New()
+ o.Name = "test"
+ o.Option = &cli.Option{Module: "all"}
+ o.ConfigPath = multipath.New("./testdata")
+ o.Registry = module.Registry{"module1": module.Creator{}, "module2": module.Creator{}}
+ o.Config.MaxProcs = 1
+ o.configName = "test.d.conf.yml"
+ assert.True(t, o.Setup())
+ assert.Equal(t, o.Config.MaxProcs, runtime.GOMAXPROCS(0))
+}
+
+func TestOrchestrator_Setup(t *testing.T) {
+ // OK all
+ o := New()
+ o.Name = "test"
+ o.Option = &cli.Option{Module: "all"}
+ o.ConfigPath = multipath.New("./testdata")
+ o.Registry = module.Registry{
+ "module1": module.Creator{},
+ "module2": module.Creator{},
+ "module3": module.Creator{}}
+ o.configName = "test.d.conf.yml"
+ assert.True(t, o.Setup())
+ assert.Len(t, o.modules, 3)
+
+ // OK all with disabled by default
+ o = New()
+ o.Name = "test"
+ o.Option = &cli.Option{Module: "all"}
+ o.ConfigPath = multipath.New("./testdata")
+ o.Registry = module.Registry{
+ "module1": module.Creator{},
+ "module2": module.Creator{},
+ "module3": module.Creator{Defaults: module.Defaults{Disabled: true}},
+ }
+ o.configName = "test.d.conf.yml"
+ assert.True(t, o.Setup())
+ assert.Len(t, o.modules, 2)
+
+ // OK specific
+ o = New()
+ o.Name = "test"
+ o.Option = &cli.Option{Module: "module2"}
+ o.ConfigPath = multipath.New("./testdata")
+ o.Registry = module.Registry{"module1": module.Creator{}, "module2": module.Creator{}}
+ o.configName = "test.d.conf.yml"
+ assert.True(t, o.Setup())
+ assert.Len(t, o.modules, 1)
+
+}
diff --git a/start.go b/start.go
new file mode 100644
index 0000000..4e8323d
--- /dev/null
+++ b/start.go
@@ -0,0 +1,69 @@
+package orchestrator
+
+import (
+ "time"
+)
+
+func (o *Orchestrator) jobStartLoop() {
+ started := make(map[string]bool)
+LOOP:
+ for {
+ select {
+ case <-o.jobStartStop:
+ break LOOP
+ case job := <-o.jobStartCh:
+ o.startOnce(job, started)
+ }
+ }
+}
+
+func (o *Orchestrator) startOnce(job Job, started map[string]bool) {
+ if started[job.FullName()] {
+ log.Warningf("%s[%s]: already served by another job, skipping ", job.ModuleName(), job.Name())
+ o.jobsStatuses.remove(job)
+ return
+ }
+
+ ok := job.AutoDetection()
+ if job.Panicked() {
+ log.Errorf("%s[%s]: panic during autodetection, skipping", job.ModuleName(), job.Name())
+ o.jobsStatuses.remove(job)
+ return
+ }
+
+ if !ok && job.RetryAutoDetection() {
+ go recheckTask(o.jobStartCh, job)
+ o.jobsStatuses.put(job, "recovering")
+ return
+ }
+
+ if !ok {
+ log.Warningf("%s[%s]: autodetection failed", job.ModuleName(), job.Name())
+ o.jobsStatuses.remove(job)
+ return
+ }
+
+ log.Infof("%s[%s]: autodetection success", job.ModuleName(), job.Name())
+
+ started[job.FullName()] = true
+ o.jobsStatuses.put(job, "active")
+ go job.Start()
+ o.loopQueue.add(job)
+}
+
+func recheckTask(ch chan Job, job Job) {
+ log.Infof("%s[%s]: scheduling next check in %d seconds",
+ job.ModuleName(),
+ job.Name(),
+ job.AutoDetectionEvery(),
+ )
+ time.Sleep(time.Second * time.Duration(job.AutoDetectionEvery()))
+
+ t := time.NewTimer(time.Second * 30)
+ defer t.Stop()
+
+ select {
+ case <-t.C:
+ case ch <- job:
+ }
+}
diff --git a/start_test.go b/start_test.go
new file mode 100644
index 0000000..2ebc605
--- /dev/null
+++ b/start_test.go
@@ -0,0 +1,27 @@
+package orchestrator
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_jobStartLoop(t *testing.T) {
+ o := New()
+
+ go o.jobStartLoop()
+
+ job := &mockJob{}
+
+ o.jobStartCh <- job
+ o.jobStartCh <- job
+ o.jobStartCh <- job
+ o.jobStartStop <- struct{}{}
+
+ assert.Equal(t, 1, len(o.loopQueue.queue))
+ assert.Equal(t, 1, len(o.jobsStatuses.items))
+
+ for _, j := range o.loopQueue.queue {
+ j.Stop()
+ }
+}
diff --git a/testdata/god-jobs-statuses.json b/testdata/god-jobs-statuses.json
new file mode 100644
index 0000000..2b6720f
--- /dev/null
+++ b/testdata/god-jobs-statuses.json
@@ -0,0 +1,5 @@
+{
+ "module1": {
+ "module1": "active"
+ }
+} \ No newline at end of file
diff --git a/testdata/test.d.conf-broken.yml b/testdata/test.d.conf-broken.yml
new file mode 100644
index 0000000..58f5237
--- /dev/null
+++ b/testdata/test.d.conf-broken.yml
@@ -0,0 +1,8 @@
+enabled: true
+default_run: yes
+
+modules:
+ module1: yes
+ module2: yes
+
+this is broken
diff --git a/testdata/test.d.conf-disabled.yml b/testdata/test.d.conf-disabled.yml
new file mode 100644
index 0000000..1b412d0
--- /dev/null
+++ b/testdata/test.d.conf-disabled.yml
@@ -0,0 +1,6 @@
+enabled: false
+default_run: yes
+
+modules:
+ module1: yes
+ module2: yes
diff --git a/testdata/test.d.conf-empty.yml b/testdata/test.d.conf-empty.yml
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/testdata/test.d.conf-empty.yml
@@ -0,0 +1 @@
+
diff --git a/testdata/test.d.conf-invalid-modules.yml b/testdata/test.d.conf-invalid-modules.yml
new file mode 100644
index 0000000..19a5523
--- /dev/null
+++ b/testdata/test.d.conf-invalid-modules.yml
@@ -0,0 +1,6 @@
+enabled: true
+default_run: yes
+
+modules:
+module1: yes
+module2: yes
diff --git a/testdata/test.d.conf.yml b/testdata/test.d.conf.yml
new file mode 100644
index 0000000..654f987
--- /dev/null
+++ b/testdata/test.d.conf.yml
@@ -0,0 +1,6 @@
+enabled: true
+default_run: yes
+
+modules:
+ module1: yes
+ module2: yes
diff --git a/testdata/test.d/module-broken.conf b/testdata/test.d/module-broken.conf
new file mode 100644
index 0000000..4c7d3ae
--- /dev/null
+++ b/testdata/test.d/module-broken.conf
@@ -0,0 +1,8 @@
+update_every: 5
+autodetection_retry: 5
+
+jobs:
+- name: job1
+- name: job2
+
+this is broken \ No newline at end of file
diff --git a/testdata/test.d/module-no-jobs.conf b/testdata/test.d/module-no-jobs.conf
new file mode 100644
index 0000000..a5ed513
--- /dev/null
+++ b/testdata/test.d/module-no-jobs.conf
@@ -0,0 +1,4 @@
+update_every: 5
+autodetection_retry: 5
+
+jobs:
diff --git a/testdata/test.d/module1.conf b/testdata/test.d/module1.conf
new file mode 100644
index 0000000..07c5d90
--- /dev/null
+++ b/testdata/test.d/module1.conf
@@ -0,0 +1,7 @@
+update_every: 5
+autodetection_retry: 5
+
+jobs:
+- name: job1
+- name: job2
+- name: job3 \ No newline at end of file
diff --git a/testdata/test.d/module2.conf b/testdata/test.d/module2.conf
new file mode 100644
index 0000000..370a2e9
--- /dev/null
+++ b/testdata/test.d/module2.conf
@@ -0,0 +1,6 @@
+update_every: 5
+autodetection_retry: 5
+
+jobs:
+- name: job1
+- name: job2 \ No newline at end of file
diff --git a/ticker.go b/ticker.go
new file mode 100644
index 0000000..5e3cc81
--- /dev/null
+++ b/ticker.go
@@ -0,0 +1,53 @@
+package orchestrator
+
+import "time"
+
+type (
+ // Ticker holds a channel that delivers ticks of a clock at intervals.
+ // The ticks is aligned to interval boundaries.
+ Ticker struct {
+ C <-chan int
+ done chan struct{}
+ loops int
+ interval time.Duration
+ }
+)
+
+// NewTicker returns a new Ticker containing a channel that will send the time with a period specified by the duration argument.
+// It adjusts the intervals or drops ticks to make up for slow receivers.
+// The duration must be greater than zero; if not, NewTicker will panic. Stop the Ticker to release associated resources.
+func NewTicker(interval time.Duration) *Ticker {
+ ticker := &Ticker{
+ interval: interval,
+ done: make(chan struct{}, 1),
+ }
+ ticker.start()
+ return ticker
+}
+
+func (t *Ticker) start() {
+ ch := make(chan int)
+ t.C = ch
+ go func() {
+ LOOP:
+ for {
+ now := time.Now()
+ nextRun := now.Truncate(t.interval).Add(t.interval)
+
+ time.Sleep(nextRun.Sub(now))
+ select {
+ case <-t.done:
+ close(ch)
+ break LOOP
+ case ch <- t.loops:
+ t.loops++
+ }
+ }
+ }()
+}
+
+// Stop turns off a Ticker. After Stop, no more ticks will be sent.
+// Stop does not close the channel, to prevent a read from the channel succeeding incorrectly.
+func (t *Ticker) Stop() {
+ t.done <- struct{}{}
+}
diff --git a/ticker_test.go b/ticker_test.go
new file mode 100644
index 0000000..21eac88
--- /dev/null
+++ b/ticker_test.go
@@ -0,0 +1,47 @@
+package orchestrator
+
+import (
+ "testing"
+ "time"
+)
+
+var allowedDelta = 50 * time.Millisecond
+
+func TestTickerParallel(t *testing.T) {
+ for i := 0; i < 100; i++ {
+ i := i
+ go func() {
+ time.Sleep(time.Second / 100 * time.Duration(i))
+ TestTicker(t)
+ }()
+ }
+ time.Sleep(4 * time.Second)
+}
+
+func TestTicker(t *testing.T) {
+ ticker := NewTicker(time.Second)
+ defer ticker.Stop()
+ prev := time.Now()
+ for i := 0; i < 3; i++ {
+ <-ticker.C
+ now := time.Now()
+ diff := abs(now.Round(time.Second).Sub(now))
+ if diff >= allowedDelta {
+ t.Errorf("Ticker is not aligned: expect delta < %v but was: %v (%s)", allowedDelta, diff, now.Format(time.RFC3339Nano))
+ }
+ if i > 0 {
+ dt := now.Sub(prev)
+ if abs(dt-time.Second) >= allowedDelta {
+ t.Errorf("Ticker interval: expect delta < %v ns but was: %v", allowedDelta, abs(dt-time.Second))
+ }
+ }
+ prev = now
+ }
+}
+
+func abs(a time.Duration) time.Duration {
+ if a < 0 {
+ return -a
+ }
+ return a
+}