Safe Haskell | None |
---|---|
Language | Haskell2010 |
Database.Kafkar
Contents
Description
Read-only support for the Kafka on-disk format, no broker required!
This module provides access to Kafka streams which are accessible directly on the local disk, without requiring a Kafka server. It contains a parser for the Kafka on-disk log format, some logic for seeking using Kafka's index files, and some logic for extracting compressed streams. The idea is to provide an interface which is similar to the one exposed by a real Kafka server, so that it shouldn't be too tricky to transparently switch between online and offline access to your Kafka data.
Usage
import Database.Kafkar import Pipes import Pipes.Safe import qualified Pipes.Prelude as P main :: IO () main = do -- Read in and parse all index files for "myTopic" topic <- loadTopic "/var/lib/kafka" "myTopic" 0 -- Read messages 1000-1009 from the topic. msgs :: [MessageEntry] msgs <- runSafeT $ P.toList $ readTopic topic (Offset 1000) >-> P.take 10 -- Stream messages from the topic, starting at the beginning of -- the stream, and pretty-print them to stdout runSafeT $ runEffect $ readTopic topic (Offset 0) >-> P.map ppMessage >-> P.stdoutLn
Operational properties
Memory use is constant-ish, but beware:
- In the case of uncompressed streams, it depends on the size of the messages;
- In the case of compressed streams, it depends on the uncompressed size of a batch of messages;
- All indices are loaded into memory. The amount of memory required for this depends on the density of the indices (configurable) and the total size of the stream in bytes. This behaviour could be improved by only loading indices when seeking, and releasing them when the seek is complete.
Files are opened only when needed and are closed promptly. When streaming across a segment boundary, for instance, the first segment's log file is closed as the second segment's log file is opened.
Is this a good idea?
The Kafka on-disk format has never been advertised as a stable, portable format. It is essentially undocumented; writing this package required some mild reverse-engineering. In light of this, is it a good idea to try to read these things directly?
Kafka makes the clever design decision of keeping data on-disk in exactly the format which it must be sent in. That is, it sits on the disk, compressed and annotated with metadata, all ready to be sent to a client. This means that Kafka can respond to data requests with a simple (and fast) call to `sendFile()`.
The upshot of this is that the on-disk format must be as stable as the wire format. Since this is a well-documented, properly versioned format which comes with commitments to backward-compatibility, it's safe to say that the on-disk format is likewise stable (barring major changes to Kafka's design).
I should note that the above argument applies only to the format of the log data. The index files are not exposed by any public interface, and so may change. In this case, seeking will break.
Other caveats
- Kafka only flushes its logs to disk periodically. This means that new messages in a topic will be absent from the on-disk logs for some amount of time.
- loadTopic :: FilePath -> String -> Int -> IO Topic
- readTopic :: MonadSafe m => Topic -> Offset -> Producer MessageEntry m ()
- data MessageEntry = MessageEntry {}
- data Message = Message {
- attributes :: !Attributes
- key :: !(Maybe ByteString)
- value :: !(Maybe ByteString)
- data Attributes = Attributes {
- compression :: !Codec
- data Codec
- newtype Offset = Offset Int64
- ppMessage :: MessageEntry -> String
Core API
Arguments
:: FilePath | Kafka log directory (eg. /var/lib/kafka) |
-> String | Topic name |
-> Int | Partition number |
-> IO Topic |
Create a handle for a topic/partition stored on the filesystem.
This function will read all index files for the given topic into memory (~10KB per segment). It will not read any log data. The returned value should not contain any thunks.
Arguments
:: MonadSafe m | |
=> Topic | Topic handle to read from |
-> Offset | Offset to begin reading at, given as a number of messages since the start of the topic. |
-> Producer MessageEntry m () |
Stream messages from the given topic, starting at the given offset.
This function uses the Kafka index to perform an efficient seek to the desired offset. Given that the index is in memory, a seek should require reading a bounded number of bytes (determined by the density of the index).
It then streams messages from the log file, parsing and decompressing
them. The compression
attribute should always be None
for messages
returned by this function.
Kafka logs are stored in multiple segments. This function should only hold a read handle for one segment file at a time.
Types
data MessageEntry Source
Instances
Constructors
Message | |
Fields
|
Message compression algorithm
The logical offset of a message within a topic
Pretty-printing
ppMessage :: MessageEntry -> String Source
Pretty-print a message in the format
<offset>: [key:] <value>