Documentation ¶
Overview ¶
code for elastic mapreduce (streaming).
Index ¶
- Constants
- func AlphaNumFilter(s string) string
- func IdentityMap(ctx MapContext)
- func IdentityReduce(ctx ReduceContext)
- func IntegerSumReduce(ctx ReduceContext)
- func LapackToolChecker(path string, t tool.Interface) error
- func List(ss3 s3.Interface, output *StepLocation, ch chan s3.ListedObject)
- func LoadLines(ss3 s3.Interface, output *StepLocation, f func(string, *KeyValue))
- func LoadLines2(ss3 s3.Interface, output *StepLocation, threads int, decider UrlDeciderFunc, ...)
- func LoadLines3(ss3 s3.Interface, output *StepLocation, threads int, proc FileProcessor)
- func NullChecker(path string, t tool.Interface) error
- func ReassembleLine(kv KeyValue) string
- func SlurpLines(r io.Reader, f func(string)) error
- func StreamUrl(u string, max int, backoff time.Duration) (io.ReadCloser, error)
- func UrlHasExtension(u, ext string) bool
- type Context
- type Count
- type Error
- type FileKeyValue
- type FileProcessor
- type FloatSumReducer
- type Flow
- type FlowListener
- type FlowsResponse
- type IdentityMapperTool
- type IdentityReducerTool
- type KeyValue
- type KeyValueProcessor
- type MapContext
- type MapTool
- type Mapper
- type MonFlow
- type Output
- type ReduceContext
- type ReduceJob
- type ReduceTool
- type Reducer
- type RunFlowResponse
- type ShowFlow
- type Slurper
- type Step
- type StepLocation
- type StepMember
- type Streaming
- type ToolChecker
- type UrlDeciderFunc
Constants ¶
View Source
const TICKER = 60 * time.Second
View Source
const VARS_PREFIX = "EMR_VARS_"
Variables ¶
This section is empty.
Functions ¶
func AlphaNumFilter ¶
func IdentityMap ¶
func IdentityMap(ctx MapContext)
func IdentityReduce ¶
func IdentityReduce(ctx ReduceContext)
func IntegerSumReduce ¶
func IntegerSumReduce(ctx ReduceContext)
func List ¶
func List(ss3 s3.Interface, output *StepLocation, ch chan s3.ListedObject)
randomize order of each listing batch
func LoadLines2 ¶
func LoadLines2(ss3 s3.Interface, output *StepLocation, threads int, decider UrlDeciderFunc, f func(string, *KeyValue))
func LoadLines3 ¶
func LoadLines3(ss3 s3.Interface, output *StepLocation, threads int, proc FileProcessor)
enables transactional processing of files
func ReassembleLine ¶
func SlurpLines ¶
reads all lines, returns error, or nil on EOF
func UrlHasExtension ¶
Types ¶
type FileKeyValue ¶
type FileProcessor ¶
type FileProcessor interface { // should return function to process keyvalue's from file, or nil if no processing ForFile(url string, size int) KeyValueProcessor // indicates the given file was successfully processed Success(url string) // called upon failure processing a file; should return a processor if we want to retry Failure(url string, size int, err error) KeyValueProcessor }
each KeyValueProcessor called from just a single thread
type FloatSumReducer ¶
type FloatSumReducer struct {
Format string
}
func (FloatSumReducer) Reduce ¶
func (f FloatSumReducer) Reduce(ctx ReduceContext)
type Flow ¶
type Flow struct { IsSpot bool Auth aws.Auth Steps []Step Instances int MasterInstanceType string MasterSpotPrice float64 `json:",omitempty"` SlaveInstanceType string SlaveSpotPrice float64 `json:",omitempty"` ScriptBucket string LogBucket string KeepAlive bool KeyName string AvailabilityZone string }
type FlowListener ¶
type FlowListener func(id, state string)
type FlowsResponse ¶
type FlowsResponse struct { State string `xml:"DescribeJobFlowsResult>JobFlows>member>ExecutionStatusDetail>State"` MasterDNS string `xml:"DescribeJobFlowsResult>JobFlows>member>Instances>MasterPublicDnsName"` Steps []StepMember `xml:"DescribeJobFlowsResult>JobFlows>member>Steps>member"` }
func (*FlowsResponse) GetStep ¶
func (f *FlowsResponse) GetStep(name string) *StepMember
type IdentityMapperTool ¶
func (*IdentityMapperTool) Description ¶
func (m *IdentityMapperTool) Description() string
func (*IdentityMapperTool) MarshalJSON ¶
func (t *IdentityMapperTool) MarshalJSON() ([]byte, error)
func (*IdentityMapperTool) Name ¶
func (m *IdentityMapperTool) Name() string
func (*IdentityMapperTool) Run ¶
func (m *IdentityMapperTool) Run(args []string)
func (*IdentityMapperTool) String ¶
func (m *IdentityMapperTool) String() string
func (*IdentityMapperTool) Tags ¶
func (t *IdentityMapperTool) Tags() []string
type IdentityReducerTool ¶
func (*IdentityReducerTool) Description ¶
func (m *IdentityReducerTool) Description() string
func (*IdentityReducerTool) MarshalJSON ¶
func (t *IdentityReducerTool) MarshalJSON() ([]byte, error)
func (*IdentityReducerTool) Name ¶
func (m *IdentityReducerTool) Name() string
func (*IdentityReducerTool) Run ¶
func (m *IdentityReducerTool) Run(args []string)
func (*IdentityReducerTool) String ¶
func (m *IdentityReducerTool) String() string
func (*IdentityReducerTool) Tags ¶
func (t *IdentityReducerTool) Tags() []string
type KeyValueProcessor ¶
type KeyValueProcessor func(*KeyValue)
type MapTool ¶
type MapTool struct {
// contains filtered or unexported fields
}
func NewHiddenMapTool ¶
func NewMapTool ¶
func (*MapTool) Description ¶
func (*MapTool) MarshalJSON ¶
type Mapper ¶
type Mapper func(ctx MapContext)
type MonFlow ¶
type MonFlow struct { Auth map[string]aws.Auth FlowListener }
func NewMonFlow ¶
func NewMonFlow(a map[string]aws.Auth, l FlowListener) *MonFlow
type ReduceContext ¶
type ReduceTool ¶
type ReduceTool struct {
// contains filtered or unexported fields
}
func NewHiddenReduceTool ¶
func NewHiddenReduceTool(r Reducer, name, description string) *ReduceTool
func NewReduceTool ¶
func NewReduceTool(r Reducer, name, description string) *ReduceTool
func (*ReduceTool) Description ¶
func (m *ReduceTool) Description() string
func (*ReduceTool) MarshalJSON ¶
func (t *ReduceTool) MarshalJSON() ([]byte, error)
func (*ReduceTool) Name ¶
func (m *ReduceTool) Name() string
func (*ReduceTool) Run ¶
func (m *ReduceTool) Run(args []string)
func (*ReduceTool) String ¶
func (m *ReduceTool) String() string
func (*ReduceTool) Tags ¶
func (t *ReduceTool) Tags() []string
type Reducer ¶
type Reducer func(ctx ReduceContext)
type RunFlowResponse ¶
type RunFlowResponse struct {
FlowId string `xml:"RunJobFlowResult>JobFlowId"`
}
func ParseEmrResponse ¶
func ParseEmrResponse(r io.Reader) (*RunFlowResponse, error)
func Run ¶
func Run(flow Flow) (*RunFlowResponse, error)
type Step ¶
type Step struct { Name string Inputs []string Output string Reducers int `json:",omitempty"` Timeout time.Duration `json:",omitempty"` Mapper, Reducer Streaming Compress bool `json:",omitempty"` CompressMapOutput bool `json:",omitempty"` SortSecondKeyField bool `json:",omitempty"` ToolChecker ToolChecker `json:",omitempty"` Vars map[string]string `json:",omitempty"` // additional args on streaming command Args []string `json:",omitempty"` // this is a big one: determines whether input files are lists of url's or not IndirectMapJob bool `json:",omitempty"` }
type StepLocation ¶
type StepMember ¶
type StepMember struct { Name string `xml:"StepConfig>Name"` Args []string `xml:"StepConfig>HadoopJarStep>Args>member"` }
func (*StepMember) ExtractVars ¶
func (f *StepMember) ExtractVars() map[string]string
func (*StepMember) Input ¶
func (s *StepMember) Input() *StepLocation
func (*StepMember) Output ¶
func (s *StepMember) Output() *StepLocation
type Streaming ¶
streaming interface is basically just tool.Interface, but with a private method just to make sure that nobody outside of this package can implement it! i.e., we control which kinds of tools can be mappers and reducers
type ToolChecker ¶
returns false if somehow tool doesn't check out
func (*ToolChecker) MarshalJSON ¶
func (t *ToolChecker) MarshalJSON() ([]byte, error)
type UrlDeciderFunc ¶
Click to show internal directories.
Click to hide internal directories.